Commit 081cb43e authored by Luc Rajer's avatar Luc Rajer
Browse files

Merge branch 'development' into 'master'


See merge request source/tmtc_core!13
parents b4370be1 390c2153
Copyright 2020 KSat e.V. Stuttgart
Modifications copyright (C) 2020 Robin Mueller for the FSFW project
Modifications copyright (C) 2020 Robin Mueller for the EIVE project
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
......@@ -10,4 +13,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
\ No newline at end of file
limitations under the License.
......@@ -46,7 +46,7 @@ but this is not mandatory.
It is recommended to fork this repository for new missions in a separate folder
which will be the primary folder for the TMTC commander software.
In this example, we will clone this repository as a submodule in a `tmtc` folder and we
In this example, we will clone this repository as a submodule in a `tmtc` folder, and we
will assume that the `tmtc` folder is located inside a folder called `obsw` which also contains
the primary OBSW. The following steps will be shown in the command line to be more generic.
......@@ -114,11 +114,11 @@ display options.
The PyCharm IDE can be used to comfortably manage a set of run configuations
(for example tests for different services). These configurations were shared
through the version control system git and should be imported automatically.
If these configurations dont show up, try to open the tmtc folder as
If these configurations don't show up, try to open the tmtc folder as
a new PyCharm project in a new window.
To add new configurations, go to Edit Configurations...
at the top right corner in the drop-down menu.
in the top right corner in the drop-down menu.
Specify the new run configurations and set a tick at Share through VCS.
### Examples
......@@ -161,22 +161,26 @@ without worrying about the used communication interface.
Serial communication was implemented and is tested for Windows 10 and Ubuntu 20.04.
It requires the PySerial package installed.
It should be noted that there are several modes for the serial communication.
There is a fixed frame mode and a mode based on a simple DLE transport layer.
There is a fixed frame mode, and a mode based on a simple DLE transport layer.
When using the DLE transport layer, sent packets are encoded with DLE while
received packets need to be DLE encoded.
## Issues
## Common Issues
### Ethernet Communication
If there are issued with the Ethernet communcation,
there might be a problematic firewall setting.
It might be necessary to allow UDP packets on certain ports
### PyCharm Problems
If PyCharm does not display console output, make sure the system interpreter
is set to `python.exe` and not `pythonw.exe`.
## Developers Information
Code Style: [PEP8](
Can be enforced/checked by using Pylint as an external program in PyCharm.
Install it with pip and then install and set-up the Pylint plugin in PyCharm.
Install it with pip and then install and set up the Pylint plugin in PyCharm.
There are a lot of features which would be nice, for example a GUI.
The architecture of the program should allow extension like that without
......@@ -84,7 +84,7 @@ class EthernetComIF(CommunicationInterface):
return True, packet_list
return False, []
def receive_telemetry(self, parameters: any = 0) -> list:
def receive_telemetry(self, parameters: any = 0) -> PusTmListT:
packet_list = []
if self.udp_socket is None:
return packet_list
......@@ -112,8 +112,9 @@ class EthernetComIF(CommunicationInterface):
except OSError:
print("Socket already set-up.")
except TypeError:
print("Invalid Receive Address")
except TypeError as error:
print("Invalid Receive Address.")
def connect_to_board(self):
......@@ -74,7 +74,7 @@ def prompt_ip_addresses():
send_port = input(
"Please enter destination port: "
send_address = (send_ip, send_port)
send_address = (send_ip, int(send_port))"Specified destination IP address: {send_ip}")"Specified destination port: {send_port}")
......@@ -104,7 +104,7 @@ def prompt_ip_addresses():
receive_port = input("Please enter receive port: ")
recv_address = (receive_ip, receive_port)
recv_address = (receive_ip, int(receive_port))"Specified receive IP address: {receive_ip}")"Specified receive destination port: {receive_port}")
......@@ -24,7 +24,6 @@ import re
import errno
import sys
import time
from asyncio import Future
from collections import deque
from threading import Thread
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface, PusTcInfoT, PusTmListT
......@@ -13,3 +13,11 @@ class ComInterfaces(enum.Enum):
Serial = 1
EthernetUDP = 2
QEMU = 5
class QueueCommands(enum.Enum):
......@@ -43,7 +43,8 @@ class TmTcFrontend(QMainWindow):
def __init__(self, init_com_if: ComInterfaces, init_mode: ModeList, init_service: ServiceList):
super(TmTcFrontend, self).__init__()
self.tmtc_handler = TmTcHandler(init_com_if=init_com_if, init_mode=init_mode, init_service=init_service)
self.tmtc_handler = TmTcHandler(init_com_if=init_com_if, init_mode=init_mode,
# TODO: Perform initialization on button press with specified ComIF
# Also, when changing ComIF, ensure that old ComIF is closed (e.g. with printout)
# Lock all other elements while ComIF is invalid.
......@@ -379,7 +380,6 @@ def ip_change_client(value):
ethernet_config[EthernetConfig.RECV_ADDRESS] = value
update_global(GlobalIds.ETHERNET_CONFIG, ethernet_config)"Client IP changed: " + value)
# tmtcc_config.G_REC_ADDRESS = (value, 2008)
def ip_change_board(value):
......@@ -388,4 +388,3 @@ def ip_change_board(value):
ethernet_config[EthernetConfig.SEND_ADDRESS] = value
update_global(GlobalIds.ETHERNET_CONFIG, ethernet_config)"Board IP changed: " + value)
# tmtcc_config.G_SEND_ADDRESS = (value, 7)
\ No newline at end of file
......@@ -187,7 +187,8 @@ class TmTcHandler:
from config.tmtcc_user_mode_op import perform_mode_operation_user
perform_mode_operation_user(self, self.mode)
except ImportError:
except ImportError as error:
LOGGER.error("Custom mode handling module not provided!")
def __core_operation(self, one_shot):
......@@ -13,7 +13,6 @@ from tmtc_core.com_if.tmtcc_ethernet_com_if import EthernetComIF
from tmtc_core.com_if.tmtcc_qemu_com_if import QEMUComIF
from tmtc_core.com_if.tmtcc_serial_com_if import SerialCommunicationType, SerialComIF
from tmtc_core.com_if.tmtcc_serial_utilities import determine_com_port
from tmtc_core.com_if.tmtcc_ethernet_utilities import determine_ip_addresses
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.utility.tmtcc_tmtc_printer import TmTcPrinter
import sys
from config.tmtcc_globals import GlobalIds
class GlobalsManager:
Global object manager. Only one global instance should be created.
The instance can be retrieved with the get_manager class method.
import struct
import sys
from config.tmtcc_object_ids import ObjectIds
from tmtc_core.utility.tmtcc_logger import get_logger
logger = get_logger()
class ObjectIdManager:
......@@ -15,10 +19,9 @@ class ObjectIdManager:
Retrieve a handle to the global object ID manager.
if cls.MANAGER_INSTANCE is None:
cls.MANAGER_INSTANCE = ObjectIdManager()
cls.MANAGER_INSTANCE = ObjectIdManager()
def __init__(self):
self.object_id_dict = dict()
self.object_ids_instantiated = False
......@@ -31,8 +34,6 @@ class ObjectIdManager:
object_id = self.object_id_dict.get(object_id_key)
if object_id is None:
from tmtc_core.utility.tmtcc_logger import get_logger
logger = get_logger()
logger.error("This key does not exist in the object ID dictionary!")
except ImportError:
print("Could not import logger!")
......@@ -40,6 +41,12 @@ class ObjectIdManager:
return object_id
def get_key_from_raw_object_id(self, object_id: bytearray):
for key, raw_id in self.object_id_dict.items():
if raw_id == object_id:
return key
return None
def __set_object_ids(self):
from config.tmtcc_object_ids import set_object_ids
......@@ -49,7 +56,30 @@ class ObjectIdManager:
logger = get_logger()
logger.error("Could not import set_object_ids")
except AttributeError:
from tmtc_core.utility.tmtcc_logger import get_logger
logger = get_logger()
logger.error("Please ensure that the object ID keys are defined as well!")
def get_object_id(object_id_key: ObjectIds):
return ObjectIdManager.get_manager().get_object_id(object_id_key)
def get_key_from_raw_object_id(object_id_raw: bytearray) -> ObjectIds:
if not isinstance(object_id_raw, bytearray):
logger.warning("Invalid object ID type.")
return ObjectIds.INVALID
if len(object_id_raw) != 4:
logger.warning("Invalid object ID length")
return ObjectIds.INVALID
return ObjectIdManager.get_manager().get_key_from_raw_object_id(object_id_raw)
def get_key_from_int_object_id(object_id_int: int) -> ObjectIds:
if not isinstance(object_id_int, int):
logger.warning("Invalid object ID type.")
return ObjectIds.INVALID
object_id_raw = bytearray(struct.pack("!I", object_id_int))
return ObjectIdManager.get_manager().get_key_from_raw_object_id(object_id_raw)
......@@ -4,4 +4,4 @@
SW_NAME = "tmtcc"
......@@ -8,6 +8,7 @@ import sys
from enum import Enum
from typing import Dict, Union, Tuple, Deque
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.utility.tmtcc_logger import get_logger
LOGGER = get_logger()
......@@ -31,7 +32,7 @@ class TcDictionaryKeys(Enum):
PusTcInfo = Dict[TcDictionaryKeys, any]
PusTcInfoT = Union[PusTcInfo, None]
PusTcTupleT = Tuple[bytearray, PusTcInfoT]
TcAuxiliaryTupleT = Tuple[str, any]
TcAuxiliaryTupleT = Tuple[QueueCommands, any]
TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT]
TcQueueT = Deque[TcQueueEntryT]
PusTcInfoQueueT = Deque[PusTcInfoT]
......@@ -7,7 +7,6 @@ This file transfers TC packing to the user application.
@date 01.11.2019
import sys
from typing import Union
from config.tmtcc_definitions import ServiceList
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT
import enum
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, TcQueueT
class Srv17Subservices(enum.IntEnum):
def pack_service17_ping_command(ssc: int) -> PusTelecommand:
return PusTelecommand(service=17, subservice=1, ssc=ssc)
return PusTelecommand(service=17, subservice=Srv17Subservices.PING_CMD, ssc=ssc)
def pack_generic_service17_test(init_ssc:int, tc_queue: TcQueueT) -> int:
def pack_generic_service17_test(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc
tc_queue.appendleft(("print", "Testing Service 17"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17"))
# ping test
tc_queue.appendleft(("print", "Testing Service 17: Ping Test"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Ping Test"))
new_ssc += 1
# enable event
tc_queue.appendleft(("print", "Testing Service 17: Enable Event"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Enable Event"))
command = PusTelecommand(service=5, subservice=5, ssc=new_ssc)
new_ssc += 1
# test event
tc_queue.appendleft(("print", "Testing Service 17: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=new_ssc)
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Trigger event"))
command = PusTelecommand(service=17, subservice=Srv17Subservices.GEN_EVENT, ssc=new_ssc)
new_ssc += 1
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 17: Invalid subservice"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Invalid subservice"))
command = PusTelecommand(service=17, subservice=243, ssc=new_ssc)
new_ssc += 1
@brief Core components for mode commanding (custom PUS service)
import struct
def pack_mode_data(object_id: bytearray, mode_: int, submode_: int) -> bytearray:
Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw
# Normal mode
mode = struct.pack(">I", mode_)
# Submode default
submode = struct.pack('B', submode_)
mode_data = object_id + mode + submode
return mode_data
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.utility.tmtcc_logger import get_logger
logger = get_logger()
def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int,
unique_id: int, parameter: bool, ssc: int):
Generic function to pack a telecommand to tweak a boolean parameter
@param object_id:
@param domain_id:
@param unique_id:
@param parameter:
@param ssc:
parameter_id = bytearray(4)
parameter_id[0] = domain_id
if unique_id > 255:
logger.warning("Invalid unique ID, should be smaller than 255!")
parameter_id[1] = unique_id
parameter_id[2] = 0
parameter_id[3] = 0
data_to_pack = bytearray(object_id)
# PTC and PFC for uint8_t according to CCSDS
ptc = 3
pfc = 4
rows = 1
columns = 1
return PusTelecommand(service=20, subservice=128, ssc=ssc, app_data=data_to_pack)
def pack_type_and_matrix_data(ptc: int, pfc: int, rows: int, columns: int) -> bytearray:
Packs the parameter information field, which contains the ECSS PTC and PFC numbers and the number of columns
and rows in the parameter.
p.428 for more information.
:param ptc: ECSS PTC number
:param pfc: ECSS PFC number
:param rows: Number of rows in parameter (for matrix entries, 1 for vector entries, 1 for scalar entries)
:param columns: Number of columns in parameter (for matrix or vector entries, 1 for scalar entries)
:return: Parameter information field as 4 byte bytearray
data = bytearray(4)
data[0] = ptc
data[1] = pfc
data[2] = rows
data[3] = columns
return data
def pack_parameter_id(domain_id: int, unique_id: int, linear_index: int) -> bytearray:
Packs the Parameter ID (bytearray with 4 bytes) which is part of the service 20 packets.
The first byte of the parameter ID is the domain ID, the second byte is a unique ID and the last two bytes are a
linear index if a parameter is not loaded from index 0.
:param domain_id: One byte domain ID
:param unique_id: One byte unique ID
:param linear_index: Two byte linear index.
parameter_id = bytearray(4)
parameter_id[0] = domain_id
parameter_id[1] = unique_id
parameter_id[2] = linear_index >> 8 & 0xff
parameter_id[3] = linear_index & 0xff
return parameter_id
import enum
import struct
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
class Srv3Subservice(enum.IntEnum):
def make_sid(object_id: bytearray, set_id: int) -> bytearray:
set_id_bytearray = struct.pack(">I", set_id)
return object_id + set_id_bytearray
def make_interval(interval_seconds: float) -> bytearray:
return bytearray(struct.pack("!f", interval_seconds))
def generate_one_hk_command(sid: bytearray, ssc: int):
return PusTelecommand(service=3, subservice=27, ssc=ssc, app_data=sid)
def generate_one_diag_command(sid: bytearray, ssc: int):
return PusTelecommand(service=3, subservice=28, ssc=ssc, app_data=sid)
import enum
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, TcQueueT
class Srv5Subservices(enum.IntEnum):
def pack_enable_event_reporting_command(ssc: int):
return PusTelecommand(service=5, subservice=5, ssc=ssc)
return PusTelecommand(service=5, subservice=Srv5Subservices.ENABLE_EVENT_REPORTING, ssc=ssc)
def pack_disable_event_reporting_command(ssc: int):
return PusTelecommand(service=5, subservice=6, ssc=ssc)
return PusTelecommand(service=5, subservice=Srv5Subservices.DISABLE_EVENT_REPORTING, ssc=ssc)
def pack_service5_test_into(tc_queue: TcQueueT):
tc_queue.appendleft(("print", "Testing Service 5"))
def pack_generic_service5_test_into(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5"))
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Invalid subservice"))
command = PusTelecommand(service=5, subservice=1, ssc=500)
# disable events
tc_queue.appendleft(("print", "Testing Service 5: Disable event"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Disable event"))
command = pack_disable_event_reporting_command(ssc=501)
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger event"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=510)
# enable event
tc_queue.appendleft(("print", "Testing Service 5: Enable event"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Enable event"))
command = pack_enable_event_reporting_command(ssc=520)
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger another event"))
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Trigger another event"))
command = PusTelecommand(service=17, subservice=128, ssc=530)
tc_queue.appendleft(("export", "log/tmtc_log_service5.txt"))
\ No newline at end of file
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service5.txt"))
import enum
import struct
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
class Srv8Subservices(enum.IntEnum):
FUNC_CMD = 128,
def generate_action_command(object_id: bytearray, action_id: int, data: bytearray = bytearray([]),
ssc: int = 0) -> PusTelecommand:
data_to_pack = bytearray(object_id)
data_to_pack += make_action_id(action_id) + data
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=data_to_pack)
return PusTelecommand(
service=8, subservice=Srv8Subservices.FUNC_CMD, ssc=ssc, app_data=data_to_pack
def make_action_id(action_id: int) -> bytearray:
return bytearray(struct.pack('!I', action_id))
\ No newline at end of file
return bytearray(struct.pack('!I', action_id))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment