Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Showing
with 1153 additions and 487 deletions
#!/usr/bin/python3.7
"""
@file tmtc_frontend.py
@date 01.11.2019
@brief This is part of the TMTC client developed by the SOURCE project by KSat
@description GUI Testing for TMTC client
@manual
@author R. Mueller, P. Scheurenbrand
"""
from multiprocessing import Process
from PyQt5.QtWidgets import *
from core.tmtc_backend import TmTcHandler
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger
from config import obsw_config
from config.obsw_definitions import ComInterfaces
import threading
LOGGER = get_logger()
"""
TODO: Make it look nicer. Add SOURCE or KSat logo.
"""
class TmTcFrontend:
# TODO: this list should probably be inside an enum in the obsw_config.py
serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"]
service_test_button: QPushButton
single_command_button: QPushButton
command_table: QTableWidget
single_command_service: int = 17
single_command_sub_service: int = 1
single_command_ssc: int = 20
single_command_data: bytearray = bytearray([])
is_busy: bool
def __init__(self):
self.tmtc_handler = TmTcHandler()
# TODO: Perform initialization on button press with specified ComIF
# Also, when changing ComIF, ensure that old ComIF is closed (e.g. with printout)
# Lock all other elements while ComIF is invalid.
self.tmtc_handler.initialize()
obsw_config.G_SERVICE = 17
obsw_config.G_COM_IF = obsw_config.ComInterfaces.QEMU
def prepare_start(self, args: any) -> Process:
return Process(target=self.start_ui)
def service_index_changed(self, index: int):
obsw_config.G_SERVICE = self.serviceList[index]
LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index]))
def single_command_set_service(self, value):
self.single_command_service = value
def single_command_set_sub_service(self, value):
self.single_command_sub_service = value
def single_command_set_ssc(self, value):
self.single_command_ssc = value
def start_service_test_clicked(self):
LOGGER.info("start service test button pressed")
LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE))
self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
def send_single_command_clicked(self, table):
LOGGER.info("send single command pressed")
# parse the values from the table
#service = int(self.commandTable.item(0, 0).text())
#subservice = int(self.commandTable.item(0, 1).text())
#ssc = int(self.commandTable.item(0, 2).text())
LOGGER.info("service: " + str(self.single_command_service) +
", subservice: " + str(self.single_command_sub_service) +
", ssc: " + str(self.single_command_ssc))
# TODO: data needs to be parsed in a different way
# data = int(self.commandTable.item(0, 3).text())
# crc = int(self.commandTable.item(0, 4).text())
# create a command out of the parsed table
command = PusTelecommand(
service=self.single_command_service, subservice=self.single_command_sub_service,
ssc=self.single_command_ssc)
self.tmtc_handler.single_command_package = command.pack_command_tuple()
self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
def handle_tm_tc_action(self):
LOGGER.info("start tmtc_handler.handle_action")
self.is_busy = True
self.set_send_buttons(False)
self.tmtc_handler.perform_operation()
self.is_busy = False
self.set_send_buttons(True)
LOGGER.info("finished tmtc_handler.handle_action")
def set_send_buttons(self, state: bool):
self.service_test_button.setEnabled(state)
self.single_command_button.setEnabled(state)
def start_ui(self):
app = QApplication([])
win = QWidget()
grid = QGridLayout()
row = 0
grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2)
row += 1
checkbox_console = QCheckBox("print output to console")
checkbox_console.setChecked(obsw_config.G_PRINT_TM)
checkbox_console.stateChanged.connect(checkbox_console_print)
checkbox_log = QCheckBox("print output to log file")
checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE)
checkbox_log.stateChanged.connect(checkbox_log_print)
checkbox_raw_tm = QCheckBox("print all raw TM data directly")
checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM)
checkbox_raw_tm.stateChanged.connect(checkbox_print_raw_data)
checkbox_hk = QCheckBox("print hk data")
checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA)
checkbox_hk.stateChanged.connect(checkbox_print_hk_data)
checkbox_short = QCheckBox("short display mode")
checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short")
checkbox_short.stateChanged.connect(checkbox_short_display_mode)
grid.addWidget(checkbox_log, row, 0, 1, 1)
grid.addWidget(checkbox_console, row, 1, 1, 1)
row += 1
grid.addWidget(checkbox_raw_tm, row, 0, 1, 1)
grid.addWidget(checkbox_hk, row, 1, 1, 1)
row += 1
grid.addWidget(checkbox_short, row, 0, 1, 1)
row += 1
grid.addWidget(QLabel("TM Timeout:"), row, 0, 1, 1)
grid.addWidget(QLabel("TM Timeout Factor:"), row, 1, 1, 1)
row += 1
spin_timeout = QDoubleSpinBox()
spin_timeout.setValue(4)
# TODO: set sensible min/max values
spin_timeout.setSingleStep(0.1)
spin_timeout.setMinimum(0.25)
spin_timeout.setMaximum(60)#
# https://youtrack.jetbrains.com/issue/PY-22908
# Ignore those warnings for now.
spin_timeout.valueChanged.connect(number_timeout)
grid.addWidget(spin_timeout, row, 0, 1, 1)
spin_timeout_factor = QDoubleSpinBox()
spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR)
# TODO: set sensible min/max values
spin_timeout_factor.setSingleStep(0.1)
spin_timeout_factor.setMinimum(0.25)
spin_timeout_factor.setMaximum(10)
spin_timeout_factor.valueChanged.connect(number_timeout_factor)
grid.addWidget(spin_timeout_factor, row, 1, 1, 1)
row += 1
grid.addWidget(QLabel("Client IP:"), row, 0, 1, 1)
grid.addWidget(QLabel("Board IP:"), row, 1, 1, 1)
row += 1
spin_client_ip = QLineEdit()
# TODO: set sensible min/max values
spin_client_ip.setInputMask("000.000.000.000;_")
spin_client_ip.textChanged.connect(ip_change_client)
grid.addWidget(spin_client_ip, row, 0, 1, 1)
spin_board_ip = QLineEdit()
# TODO: set sensible min/max values
spin_board_ip.setInputMask("000.000.000.000;_")
spin_board_ip.textChanged.connect(ip_change_board)
#spin_board_ip.setText(obsw_config.G_SEND_ADDRESS[0])
grid.addWidget(spin_board_ip, row, 1, 1, 1)
row += 1
# com if configuration
grid.addWidget(QLabel("Communication Interface:"), row, 0, 1, 1)
com_if_comboBox = QComboBox()
# add all possible ComIFs to the comboBox
for comIf in obsw_config.ComInterfaces:
com_if_comboBox.addItem(comIf.name)
com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value)
com_if_comboBox.currentIndexChanged.connect(com_if_index_changed)
grid.addWidget(com_if_comboBox, row, 1, 1, 1)
row += 1
# service test mode gui
grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2)
row += 1
comboBox = QComboBox()
for service in self.serviceList:
comboBox.addItem("Service - " + str(service))
comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE))
comboBox.currentIndexChanged.connect(self.service_index_changed)
grid.addWidget(comboBox, row, 0, 1, 1)
self.service_test_button = QPushButton()
self.service_test_button.setText("Start Service Test")
self.service_test_button.clicked.connect(self.start_service_test_clicked)
grid.addWidget(self.service_test_button, row, 1, 1, 1)
row += 1
# single command operation
grid.addWidget(QLabel("Single Command Operation:"), row, 0, 1, 1)
row += 1
single_command_grid = QGridLayout()
single_command_grid.setSpacing(5)
single_command_grid.addWidget(QLabel("Service:"), row, 0, 1, 1)
single_command_grid.addWidget(QLabel("SubService:"), row, 1, 1, 1)
single_command_grid.addWidget(QLabel("SSC:"), row, 2, 1, 1)
row += 1
spin_service = QSpinBox()
spin_service.setValue(self.single_command_service)
# TODO: set sensible min/max values
spin_service.setMinimum(0)
spin_service.setMaximum(99999)
spin_service.valueChanged.connect(self.single_command_set_service)
single_command_grid.addWidget(spin_service, row, 0, 1, 1)
spin_sub_service = QSpinBox()
spin_sub_service.setValue(self.single_command_sub_service)
# TODO: set sensible min/max values
spin_sub_service.setMinimum(0)
spin_sub_service.setMaximum(99999)
spin_sub_service.valueChanged.connect(self.single_command_set_sub_service)
single_command_grid.addWidget(spin_sub_service, row, 1, 1, 1)
spin_ssc = QSpinBox()
spin_ssc.setValue(self.single_command_ssc)
# TODO: set sensible min/max values
spin_ssc.setMinimum(0)
spin_ssc.setMaximum(99999)
spin_ssc.valueChanged.connect(self.single_command_set_ssc)
single_command_grid.addWidget(spin_ssc, row, 2, 1, 1)
row += 1
single_command_grid.addWidget(QLabel("Data:"), row, 0, 1, 3)
row += 1
# TODO: how should this be converted to the byte array?
single_command_data_box = QTextEdit()
single_command_grid.addWidget(single_command_data_box, row, 0, 1, 3)
grid.addItem(single_command_grid, row, 0, 1, 2)
row += 1
#self.commandTable = SingleCommandTable()
#grid.addWidget(self.commandTable, row, 0, 1, 2)
row += 1
self.single_command_button = QPushButton()
self.single_command_button.setText("Send single command")
self.single_command_button.clicked.connect(self.send_single_command_clicked)
grid.addWidget(self.single_command_button, row, 0, 1, 2)
row += 1
win.setLayout(grid)
win.resize(900, 800)
win.show()
# resize table columns to fill the window width
#for i in range(0, 5):
# self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3)
app.exec_()
class SingleCommandTable(QTableWidget):
def __init__(self):
super().__init__()
self.setRowCount(1)
self.setColumnCount(5)
self.setHorizontalHeaderItem(0, QTableWidgetItem("Service"))
self.setHorizontalHeaderItem(1, QTableWidgetItem("Subservice"))
self.setHorizontalHeaderItem(2, QTableWidgetItem("SSC"))
self.setHorizontalHeaderItem(3, QTableWidgetItem("Data"))
self.setHorizontalHeaderItem(4, QTableWidgetItem("CRC"))
self.setItem(0, 0, QTableWidgetItem("17"))
self.setItem(0, 1, QTableWidgetItem("1"))
self.setItem(0, 2, QTableWidgetItem("20"))
def com_if_index_changed(index: int):
obsw_config.G_COM_IF = ComInterfaces(index)
LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF))
def checkbox_console_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " console print")
obsw_config.G_PRINT_TM = state == 0
def checkbox_log_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " print to log")
obsw_config.G_PRINT_TO_FILE = state == 0
def checkbox_print_raw_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data")
obsw_config.G_PRINT_RAW_TM = state == 0
def checkbox_print_hk_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data")
obsw_config.G_PRINT_HK_DATA = state == 0
def checkbox_short_display_mode(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode")
obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0]
def number_timeout(value: float):
LOGGER.info("tm timeout changed to: " + str(value))
obsw_config.G_TM_TIMEOUT = value
def number_timeout_factor(value: float):
LOGGER.info("tm timeout factor changed to: " + str(value))
obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value
def ip_change_client(value):
LOGGER.info("client ip changed: " + value)
obsw_config.G_REC_ADDRESS = (value, 2008)
def ip_change_board(value):
LOGGER.info("board ip changed: " + value)
obsw_config.G_SEND_ADDRESS = (value, 7)
\ No newline at end of file
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
obsw_tmtc_gui.py
@date
01.11.2019
@brief
This is part of the TMTC client developed by the SOURCE project by KSat
@description
GUI Testing for TMTC client
@manual
@author:
R. Mueller
"""
import tkinter as tk
from multiprocessing.connection import Client
from multiprocessing import Process
from tmtc_core.utility.obsw_logger import get_logger
import time
LOGGER = get_logger()
# A first simple version has drop down menus to chose all necessary options
# which are normally handled by the args parser.
# when pressing save, all chosen values get passed to the globals file (OBSW_Config)
# To start the program, another button with start needs to be set up.
# A third button to perform a keyboard interrupt should be implemented
# include a really nice source badge and make it large !
# plan this on paper first...
# Step 1: Huge Mission Badge in Tkinter window because that is cool.
# Step 2: Simple buttons to run servce tests around the badge.
class TmTcGUI(Process):
def __init__(self):
super(TmTcGUI, self).__init__()
self.root = None
self.address = ('localhost', 6000)
self.conn = Client(self.address, authkey=None)
self.counter = 0
def run(self):
self.open()
def open(self):
self.root = tk.Tk()
self.root.title("Hallo Welt")
while True:
LOGGER.info("TmTcGUI: Sending test message")
self.conn.send("test")
self.root.update()
time.sleep(0.1)
self.counter = self.counter + 1
if self.counter == 50:
self.conn.send("close")
self.conn.close()
break
# self.window.mainloop()
def close(self):
pass
# self.window.quit()
aiohttp==3.6.2
astroid==2.4.2
async-timeout==3.0.1
attrs==19.3.0
chardet==3.0.4
colorama==0.4.3
cpplint==1.5.4
crcmod>=1.7
docopt==0.6.2
future==0.18.2
idna==2.10
iso8601==0.1.12
isort==5.4.2
lazy-object-proxy==1.5.1
mccabe==0.6.1
multidict==4.7.6
pylint>=2.5.3
PyQt5>=5.15.0
PyQt5-sip>=12.8.0
PyQt5>=5.15.1
PyQt5-stubs>=5.14.2.2
pyserial>=3.4
PyYAML==5.3.1
six==1.15.0
toml==0.10.1
typing-extensions==3.7.4.2
wrapt==1.11.2
yarl==1.5.1
from typing import Union
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque
from tc.obsw_tc_service8 import make_action_id
from tc.obsw_tc_service20 import pack_boolean_parameter_setting
from tmtc_core.utility.obsw_logger import get_logger
from config.obsw_config import SW_IMAGE_HANDLER_ID
LOGGER = get_logger()
def generate_copy_bl_sdc_to_flash_packet(ssc: int, from_fram: bool = False,
object_id: bytearray = SW_IMAGE_HANDLER_ID):
app_data = bytearray(object_id)
app_data += make_action_id(11)
app_data.append(from_fram)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
# Slot values: 0 -> Slot 0, 1 -> Slot 1, 2 -> Update Slot
def generate_copy_obsw_sdc_to_flash_packet(ssc: int, slot: int,
object_id: bytearray = SW_IMAGE_HANDLER_ID):
app_data = bytearray(object_id)
app_data += make_action_id(4)
app_data.append(slot)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_param_packet_hamming_from_sdcard(enable: bool, ssc: int,
object_id: bytearray = SW_IMAGE_HANDLER_ID):
return pack_boolean_parameter_setting(object_id=object_id, domain_id=0, unique_id=0,
parameter=enable, ssc=ssc)
def generate_img_handler_packet(service_queue: Deque, op_code: Union[int, str]):
# Action ID 4 (Copy SDC to Flash, Software Update Image)
if op_code == "A4U":
service_queue.appendleft(("print", "Generating command to copy SDC OBSW to flash."))
command = generate_copy_obsw_sdc_to_flash_packet(ssc=0, slot=2)
service_queue.appendleft(command.pack_command_tuple())
# Action ID 11 (Copy bootloader from SDC to flash
elif op_code == "A11S":
service_queue.appendleft(("print", "Generating command to copy SDC bootloader to flash."))
command = generate_copy_bl_sdc_to_flash_packet(0)
service_queue.appendleft(command.pack_command_tuple())
elif op_code == "P0":
service_queue.appendleft(("print", "Configuring hamming code to be taken from SD card"))
command = generate_param_packet_hamming_from_sdcard(enable=True, ssc=0)
service_queue.appendleft(command.pack_command_tuple())
elif op_code == "P1":
service_queue.appendleft(("print", "Configuring hamming code to be taken from FRAM"))
command = generate_param_packet_hamming_from_sdcard(enable=False, ssc=0)
service_queue.appendleft(command.pack_command_tuple())
......@@ -49,4 +49,4 @@ def pack_tc_info(tc_list: List[PusTelecommand]) -> List[PusTcInfo]:
tc_info_list = list()
for tc in tc_list:
tc_info_list.append(tc.pack_information())
return tc_info_list
\ No newline at end of file
return tc_info_list
......@@ -14,11 +14,14 @@ from tc.obsw_tc_service2 import pack_service2_test_into
from tc.obsw_tc_service3 import pack_service3_test_into
from tc.obsw_tc_service8 import pack_service8_test_into
from tc.obsw_tc_service9 import pack_service9_test_into
from tc.obsw_tc_service23 import pack_service23_test_into
from tc.obsw_tc_service23_sdcard import pack_service23_commands_into
from tc.obsw_tc_service20 import pack_service20_test_into
from tc.obsw_tc_utility import pack_utility_command
from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into
from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into
from tc.obsw_image_handler import generate_img_handler_packet
from tc.obsw_tc_gps import pack_gps_test_into
from tc.obsw_tc_core import pack_core_command
from tmtc_core.utility.obsw_logger import get_logger
import config.obsw_config as g
......@@ -32,7 +35,8 @@ class ServiceQueuePacker:
def __init__(self):
pass
def pack_service_queue(self, service: Union[int, str], service_queue: TcQueueT):
@staticmethod
def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT):
if service == 2:
return pack_service2_test_into(service_queue)
if service == 3:
......@@ -44,24 +48,30 @@ class ServiceQueuePacker:
if service == 9:
return pack_service9_test_into(service_queue)
if service == 17:
return pack_service17_test_into(service_queue)
return pack_service17_test_into(service_queue, op_code)
if service == 20:
return pack_service20_test_into(service_queue)
if service == 23:
return pack_service23_test_into(service_queue)
if service == 23 or service.lower() == "sd":
return pack_service23_commands_into(service_queue, op_code)
if service == 200:
return pack_service200_test_into(service_queue)
if service == "Dummy":
if service.lower() == "dummy":
return pack_dummy_device_test_into(service_queue)
if service == "GPS0":
if service.lower() == "img":
return generate_img_handler_packet(service_queue, op_code)
if service.lower() == "core":
return pack_core_command(service_queue, op_code)
if service.lower() == "led":
return pack_utility_command(service_queue, op_code)
if service.lower() == "gps0":
# Object ID: GPS Device
object_id = g.GPS0_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service == "GPS1":
if service.lower() == "gps1":
# Object ID: GPS Device
object_id = g.GPS1_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service == "Error":
if service.lower() == "Error":
return pack_error_testing_into(service_queue)
LOGGER.warning("Invalid Service !")
......@@ -77,6 +87,7 @@ def create_total_tc_queue() -> TcQueueT:
tc_queue = pack_service8_test_into(tc_queue)
tc_queue = pack_service9_test_into(tc_queue)
tc_queue = pack_service17_test_into(tc_queue)
tc_queue = pack_service20_test_into(tc_queue)
tc_queue = pack_service200_test_into(tc_queue)
tc_queue = pack_dummy_device_test_into(tc_queue)
object_id = g.GPS0_DEVICE_ID
......
from typing import Deque
from config.obsw_config import CORE_CONTROLLER_ID
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
def pack_core_command(tc_queue: Deque, op_code, ssc: int = 0):
if op_code.lower() == "a0":
tc_queue.appendleft(("print", "Generating OBC run time stats"))
command = generate_run_time_stats_generation_command(ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
if op_code.lower() == "a10":
tc_queue.appendleft(("print", "Resetting OBC."))
command = generate_reset_obc_command(ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
if op_code.lower() == "a11":
tc_queue.appendleft(("print", "Power cycling OBC."))
command = generate_power_cycle_obc_command(ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
def generate_reset_obc_command(ssc: int, object_id: bytearray = CORE_CONTROLLER_ID):
app_data = bytearray(object_id)
app_data += make_action_id(10)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_power_cycle_obc_command(ssc: int, object_id: bytearray = CORE_CONTROLLER_ID):
app_data = bytearray(object_id)
app_data += make_action_id(11)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_run_time_stats_generation_command(ssc: int, object_id: bytearray = CORE_CONTROLLER_ID):
app_data = bytearray(object_id)
app_data += make_action_id(0)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
......@@ -11,6 +11,7 @@ from tc.obsw_tc_service2 import pack_mode_data
import config.obsw_config as g
def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
if object_id == g.GPS0_DEVICE_ID:
gps_string = "GPS0"
......
......@@ -9,15 +9,52 @@ import struct
from typing import Deque
import config.obsw_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT
from tmtc_core.utility.obsw_logger import get_logger
from tc.obsw_tc_service200 import pack_mode_data
LOGGER = get_logger()
def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int,
unique_id: int, parameter: bool, ssc: int):
"""
Generic function to pack a telecommand to tweak a boolean parameter
@param object_id:
@param domain_id:
@param unique_id:
@param parameter:
@param ssc:
@return:
"""
parameter_id = bytearray(4)
parameter_id[0] = domain_id
if unique_id > 255:
LOGGER.warning("Invalid unique ID, should be smaller than 255!")
return
parameter_id[1] = unique_id
parameter_id[2] = 0
parameter_id[3] = 0
data_to_pack = bytearray(object_id)
data_to_pack.extend(parameter_id)
# PTC and PFC for uint8_t according to CCSDS
ptc = 3
pfc = 4
rows = 1
columns = 1
data_to_pack.append(ptc)
data_to_pack.append(pfc)
data_to_pack.append(rows)
data_to_pack.append(columns)
data_to_pack.append(parameter)
return PusTelecommand(service=20, subservice=128, ssc=ssc, app_data=data_to_pack)
def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
#parameter IDs
# parameter IDs
parameterID0 = 0
parameterID1 = 1
parameterID2 = 2
parameterID1 = 256
parameterID2 = 512
if called_externally is False:
tc_queue.appendleft(("print", "Testing Service 20"))
......@@ -29,121 +66,68 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
#test invalid subservice
#use subservice 130 for invalid subservice check, as this is in use for dump reply
#(and therefore will never be a valid subservice)
#tc_queue.appendleft(("print", "Testing Service 20: Invalid subservice"))
#mode_data = pack_mode_data(object_id, 2, 0)
#command = PusTelecommand(service=20, subservice=130, ssc=810, app_data=mode_data)
#tc_queue.appendleft(command.pack_command_tuple())
#test invalid objectid //TODO: do we have an objectid known to be empty (even in future)?
#tc_queue.appendleft(("print", "Testing Service 20: Invalid object ID"))
#mode_data = pack_mode_data(object_id, 2, 0)
#command = PusTelecommand(service=20, subservice=128, ssc=810, app_data=mode_data)
#tc_queue.appendleft(command.pack_command_tuple())
#test invalid parameterID for load
#tc_queue.appendleft(("print", "Testing Service 20: Invalid parameter ID for load"))
#mode_data = pack_mode_data(object_id, 2, 0)
#command = PusTelecommand(service=20, subservice=128, ssc=810, app_data=mode_data)
#tc_queue.appendleft(command.pack_command_tuple())
#test invalid parameterID for dump
#tc_queue.appendleft(("print", "Testing Service 20: Invalid parameter ID for dump"))
#mode_data = pack_mode_data(object_id, 2, 0)
#command = PusTelecommand(service=20, subservice=129, ssc=810, app_data=mode_data)
#tc_queue.appendleft(command.pack_command_tuple())
#test checking Load for uint32_t
# test checking Load for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t"))
parameter_id = struct.pack(">I", parameterID0)
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack(">I", 42)
payload = object_id + parameter_id + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2001, app_data=payload)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
#test checking Dump for uint32_t
# test checking Dump for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t"))
parameter_id = struct.pack(">I", parameterID0)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2001, app_data=payload)
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
if called_externally is False:
tc_queue.appendleft(("export", "log/tmtc_log_service20.txt"))
return tc_queue
"""
#test checking Load for int32_t
# test checking Load for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Load int32_t"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=20, subservice=128, ssc=2003, app_data=mode_data)
parameter_id = struct.pack(">I", parameterID1)
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack(">i", -42)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
#test checking Dump for int32_t
# test checking Dump for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=20, subservice=129, ssc=2004, app_data=mode_data)
parameter_id = struct.pack(">I", parameterID1)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
#test checking Load for float
# test checking Load for float
tc_queue.appendleft(("print", "Testing Service 20: Load float"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=20, subservice=128, ssc=2005, app_data=mode_data)
parameter_id = struct.pack(">I", parameterID2)
type_and_matrix_data = pack_type_and_matrix_data(5, 1, 1, 1)
parameter_data = struct.pack(">f", 4.2)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
#test checking Dump for float
# test checking Dump for float
tc_queue.appendleft(("print", "Testing Service 20: Dump float"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=20, subservice=129, ssc=2006, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
"""
"""
# set mode on
tc_queue.appendleft(("print", "Testing Service 8: Set On Mode"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
parameter_id = struct.pack(">I", parameterID2)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# set mode normal
tc_queue.appendleft(("print", "Testing Service 8: Set Normal Mode"))
mode_data = pack_mode_data(object_id, 2, 0)
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if called_externally is False:
tc_queue.appendleft(("export", "log/tmtc_log_service20.txt"))
return tc_queue
# Direct command which triggers completion reply
tc_queue.appendleft(("print", "Testing Service 8: Trigger Completion Reply"))
action_id = g.DUMMY_COMMAND_1
direct_command = object_id + action_id
command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command)
tc_queue.appendleft(command.pack_command_tuple())
# Direct command which triggers _tm_data reply
tc_queue.appendleft(("print", "Testing Service 8: Trigger Data Reply"))
action_id = g.DUMMY_COMMAND_2
command_param1 = g.DUMMY_COMMAND_2_PARAM_1
command_param2 = g.DUMMY_COMMAND_2_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command)
tc_queue.appendleft(command.pack_command_tuple())
def pack_type_and_matrix_data(ptc, pfc, column, row):
data = bytearray(4)
data[0] = ptc
data[1] = pfc
data[2] = column
data[3] = row
return data
# Direct command which triggers an additional step reply and one completion reply
tc_queue.appendleft(("print", "Testing Service 8: Trigger Step and Completion Reply"))
action_id = g.DUMMY_COMMAND_3
direct_command = object_id + action_id
command = PusTelecommand(service=8, subservice=128, ssc=840, app_data=direct_command)
tc_queue.appendleft(command.pack_command_tuple())
# set mode off
tc_queue.appendleft(("print", "Testing Service 8: Set Off Mode"))
mode_data = pack_mode_data(object_id, 0, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("wait", 2))
"""
def pack_service23_commands_into(tc_queue: TcQueueT, op_code: int):
if op_code == 0:
pack_service20_test_into(tc_queue=tc_queue)
# -*- coding: utf-8 -*-
"""
Created: 21.01.2020 07:48
@author: Jakob Meier
"""
import math
import config.obsw_config as g
from typing import Deque
from tc.obsw_pus_tc_packer import PusTelecommand
class Service23WriteToFile:
""" This class generates the telecommand packet for the custom PUS service [23, 128]
@param objectID(bytearray): The objectID of the filesystem handler (e.g. SD_CARD_HANDLER_ID, PLOCHandler)
@param repository_path(str): The directory of the file
@param filename(str): The name of the file (e.g. boot.bin)
@param file_data(bytearray): The data to write to the file
"""
def __init__(self, repository_path: str, filename: str, size_of_data_blocks: int,
object_id: bytearray = bytearray([]), file_data: bytearray = bytearray([])):
"""
@param repository_path:
@param filename:
@param object_id:
@param file_data:
"""
self.file_data = []
self.number_of_packets = 0
self.file_data = file_data
self.data_to_pack = object_id
repository_path_length_h = len(bytearray(repository_path, 'utf-8')) >> 8
repository_path_length_l = 0x00FF & len(bytearray(repository_path, 'utf-8'))
self.data_to_pack.append(repository_path_length_h)
self.data_to_pack.append(repository_path_length_l)
if repository_path != "":
self.data_to_pack += bytearray(repository_path, 'utf-8')
self.data_to_pack.append(len(bytearray(filename, 'utf-8')))
self.data_to_pack += bytearray(filename, 'utf-8')
self.pus_packets = self.split_large_file(size_of_data_blocks)
def split_large_file(self, size_of_data_blocks):
""" This function splits a large file in multiple packets
This is necessary because the packet size is limited
:param size_of_data_blocks: The file is splitted in data blocks of this size
:return: List containing the PUS packets generated for each data block
"""
self.number_of_packets = math.floor(len(self.file_data) / size_of_data_blocks)
packetNumber = 0
commands = []
for i in range(self.number_of_packets):
# append packet number
self.data_to_pack.append(size_of_data_blocks)
self.data_to_pack.append(packetNumber >> 8)
self.data_to_pack.append(0xFF & packetNumber)
self.data_to_pack += self.file_data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks]
commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=self.data_to_pack))
# Last data block, packet number and data block size is removed to create new command with next data block,
# packet number and data block size
self.data_to_pack = self.data_to_pack[:len(self.data_to_pack) - size_of_data_blocks - 2 - 1]
packetNumber = packetNumber + 1
# Calculate remaining data bytes
remaining_data_size = len(self.file_data) - self.number_of_packets * size_of_data_blocks
self.data_to_pack.append(remaining_data_size)
self.data_to_pack.append(packetNumber >> 8)
self.data_to_pack.append(0xFF & packetNumber)
self.data_to_pack += self.file_data[self.number_of_packets * size_of_data_blocks:len(self.file_data)]
commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=self.data_to_pack))
return commands
def get_number_of_packets(self):
return self.number_of_packets
# def sendSoftwareUpdate(comInterface, tmtcPrinter, tmListener, tmTimeout, tcTimeoutFactor, doPrintToFile):
# """ This function sends all packets to upload a new version of the software image
# The new image is assumed to be stored in the tmtc directory
# """
# image = open("sourceobsw-at91sam9g20_ek-sdram.bin", "rb").read()
# update = Service23WriteToFile("", "boot.bin", objectID=[0x4D, 0x00, 0x73, 0xAD], file_data=image)
# updateQueue = deque()
# updateQueue.appendleft(("print", "Service 23 Software Update"))
# i = 1
# for packet in update.pus_packets:
# updateQueue.appendleft(
# ("print", "Packet " + str(i) + "/" + str(len(update.pus_packets)) + " of software update sent"))
# updateQueue.appendleft(packet.packCommandTuple())
# i = i + 1
# updateQueue.appendleft(("print", "Transmission of software update complete"))
# SenderAndReceiver = SequentialCommandSenderReceiver(comInterface, tmtcPrinter, tmListener, tmTimeout, updateQueue,
# tcTimeoutFactor, doPrintToFile)
# SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()
def generate_service23_subservice2_packet(filename: str, repositoryPath: str = "",
object_id=bytearray([])):
""" This function generates the application data field of a service 23/subservice 2 to
delete a file on a file-system-capable on-board memory.
: param filename: The name of the file to delete
repository_path: The path where the directory shall be created
objectID: The object ID of the memory handler which manages the file system
:return The TC[23,2] PUS packet
"""
data_to_pack = object_id
data_to_pack += bytearray(repositoryPath, 'utf-8')
# Add string terminator to repository_path
data_to_pack.append(0)
data_to_pack += bytearray(filename, 'utf-8')
# Add string terminator to filename
data_to_pack.append(0)
return PusTelecommand(service=23, subservice=2, ssc=21, app_data=data_to_pack)
def generate_service23_subservice9_packet(directory_name: str, repository_path: str = "",
object_id=bytearray([])):
""" This function generates the application data field of a service 23/subservice 9 packet.
This service can be used to create directories on file systems.
: param repository_path: The path where the directory shall be created
directoryName: The name of the directory to create
objectID: The object ID of the memory handler which manages the file system
:return
"""
data_to_pack = object_id
data_to_pack += bytearray(repository_path, 'utf-8')
# Add string terminator to repository path
data_to_pack.append(0)
data_to_pack += bytearray(directory_name, 'utf-8')
# Add string terminator to directory name
data_to_pack.append(0)
return PusTelecommand(service=23, subservice=9, ssc=21, app_data=data_to_pack)
def generateService23Subservice10Packet(directory_name: str, repository_path: str = "",
object_id=bytearray([])):
""" This function generates the application data field for a PUS packet with service
23 and subservie 10.
This service deletes the directory dirname.
@param directory_name: Name of the directory to delete
@param repository_path: Path to directory dirname
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return The application data field of the (23,10) PUS packet
"""
data_to_pack = object_id
data_to_pack += bytearray(repository_path, 'utf-8')
# Add string terminator of repository path
data_to_pack.append(0)
data_to_pack += bytearray(directory_name, 'utf-8')
# Add string terminator of directory name
data_to_pack.append(0)
return PusTelecommand(service=23, subservice=10, ssc=21, app_data=data_to_pack)
def generate_service23_subservice128_packet(repository_path: str, filename: str,
object_id: bytearray = bytearray([]),
file_data: bytearray = bytearray([])):
""" This function generates the application data field for a PUS packet with service
23 and subservie 128. Subservice 128 is a custom service to write data in a file.
Additionally file is created if not already existing.
@param repository_path: The path of the target file
@param filename: Name of file from which the content shall be read
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@param file_data: The data to write in the file
@return: The application data field of the (23,128) PUS packet
"""
data_to_pack = object_id
data_to_pack += bytearray(repository_path, 'utf-8')
# Add string terminator of repository path
data_to_pack.append(0)
data_to_pack += bytearray(filename, 'utf-8')
# Add string terminator of filename
data_to_pack.append(0)
return split_large_file(data_to_pack, 236, file_data)
def split_large_file(data_to_pack: bytearray, size_of_data_blocks, data: bytearray):
"""
This function splits a large file in multiple packets.
This is necessary because the packet size is limited.
@param data_to_pack: bytearray of data. data will be split and appended to this bytearray
@param size_of_data_blocks: The file is splitted in data blocks of this size
@param data: The data to pack in multiple packets
@return List containing the PUS packets generated for each data block
"""
numberOfPackets = math.floor(len(data) / size_of_data_blocks)
packetNumber = 0
commands = []
for i in range(numberOfPackets):
data_to_pack.append(packetNumber >> 8)
data_to_pack.append(0xFF & packetNumber)
data_to_pack += data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks]
commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=data_to_pack))
# Last data block, packet number and data block size is removed to create new command with
# next data block, packet number
data_to_pack = data_to_pack[:len(data_to_pack) - size_of_data_blocks - 2]
packetNumber = packetNumber + 1
data_to_pack.append(packetNumber >> 8)
data_to_pack.append(0xFF & packetNumber)
data_to_pack += data[numberOfPackets * size_of_data_blocks:len(data)]
commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=data_to_pack))
return commands
def generate_service23_subservice129_packet(repository_path: str, filename: str,
object_id: bytearray = bytearray([])):
"""
This function generates the application data field for a PUS packet with service
23 and subservie 129. Subservice 129 is a custom service to request the data of a file.
@param repository_path: The path of the target file
@param filename: Name of file from which the content shall be read
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return: The application data field of the (23,129) PUS packet
"""
data_to_pack = object_id
data_to_pack += bytearray(repository_path, 'utf-8')
# Add string terminator of repository paht
data_to_pack.append(0)
data_to_pack += bytearray(filename, 'utf-8')
# Add string terminator of filename
data_to_pack.append(0)
return PusTelecommand(service=23, subservice=129, ssc=21, app_data=data_to_pack)
def pack_service23_test_into(tc_queue: Deque) -> Deque:
sd_handler_id = g.SD_CARD_HANDLER_ID
tc_queue.appendleft(("print", "Testing Service 23"))
tc_queue.append(("print", "Create directory 'test'"))
command = generate_service23_subservice9_packet(directory_name="test", object_id=sd_handler_id)
tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'"))
# command = generate_service23_subservice9_packet(repository_path="test", directory_name="subdir",
# object_id=sd_handler_id)
# tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Create and write in test.bin"))
# command = generate_service23_subservice128_packet(
# "test/subdir", "test.bin", sd_handler_id, file_data=bytearray([0x01, 0x00, 0x01, 0x00]))
# tc_queue.appendleft(command[0].pack_command_tuple())
# tc_queue.appendleft(("print", "Read data of test.bin"))
# command = generate_service23_subservice129_packet("test/subdir", "test.bin", sd_handler_id)
# tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Delete 'test.bin'"))
# command = generate_service23_subservice2_packet("test.bin", "test/subdir", object_id=sd_handler_id)
# tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Delete 'subdir' directory"))
# command = generateService23Subservice10Packet("subdir", "test", object_id=sd_handler_id)
# tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Delete 'test' directory"))
# command = generateService23Subservice10Packet(directory_name="test", object_id=sd_handler_id)
# tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "\r"))
return tc_queue
# -*- coding: utf-8 -*-
"""
Created: 21.01.2020 07:48
@author: Jakob Meier
"""
import config.obsw_config as g
from typing import Deque, Union
from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def generate_print_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(2)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_clear_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(20)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_format_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(21)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_generic_folder_structure(
tc_queue: Deque, init_ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID,
iobc: bool = False):
tc_queue.appendleft(("print", "Creating TC folder"))
command = generate_mkdir_srv23_9_packet("TC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating TM folder"))
command = generate_mkdir_srv23_9_packet("TM", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM", directory_name="HK", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM", directory_name="SC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
command = generate_mkdir_srv23_9_packet(
repository_path="TM/SC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM/SC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating BIN folder"))
command = generate_mkdir_srv23_9_packet("BIN", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
if iobc:
command = generate_mkdir_srv23_9_packet(
repository_path="BIN", directory_name="IOBC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/IOBC", directory_name="BL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/IOBC", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
else:
command = generate_mkdir_srv23_9_packet(
repository_path="BIN", directory_name="AT91", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/AT91", directory_name="BL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/AT91", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating MISC folder"))
command = generate_mkdir_srv23_9_packet("MISC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
def generate_create_file_srv23_1_packet(
filename: str, repository_path: str, ssc: int, max_size_of_app_data: int,
initial_data: bytearray = bytearray([]),
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> Union[PusTelecommand, None]:
if len(initial_data) > calculate_allowed_file_data_size(
max_size_of_app_data, filename, repository_path):
LOGGER.error("generate_create_file_srv23_1_packet: Initial data too large!")
return None
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack += initial_data
return PusTelecommand(service=23, subservice=1, ssc=ssc, app_data=data_to_pack)
def generate_rm_file_srv23_2_packet(filename: str, repository_path: str,
ssc: int, object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which is used to delete a file on a
file-system-capable on-board memory.
@param filename: The name of the file to delete
@param repository_path: The path where the directory shall be created
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=2, ssc=ssc, app_data=data_to_pack)
def generate_report_file_attr_srv23_3_packet(
filename: str, repository_path: str, ssc: int,
object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
@param filename: The name of the file to delete
@param repository_path: The path where the directory shall be created
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=3, ssc=ssc, app_data=data_to_pack)
def generate_mkdir_srv23_9_packet(directory_name: str, ssc: int, repository_path: str = "/",
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which is used to create directories on file systems.
@param directory_name: The path where the directory shall be created
@param repository_path: The name of the directory to create
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=directory_name)
return PusTelecommand(service=23, subservice=9, ssc=ssc, app_data=data_to_pack)
def generate_rmdir_srv23_10_packet(directory_name: str, repository_path: str, ssc: int,
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which deletes the a directory at the specified repository path.
@param directory_name: Name of the directory to delete
@param repository_path: Path to directory dirname
@param ssc: source sequence count
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=directory_name)
return PusTelecommand(service=23, subservice=10, ssc=ssc, app_data=data_to_pack)
def generate_append_to_file_srv23_130_packet(
filename: str, repository_path: str, packet_sequence_number: int,
ssc: int, file_data: bytearray = bytearray([]),
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack += file_data
data_to_pack.append(packet_sequence_number >> 8)
data_to_pack.append(packet_sequence_number & 0xff)
return PusTelecommand(service=23, subservice=130, ssc=ssc, app_data=data_to_pack)
def generate_finish_append_to_file_srv23_131_packet(
filename: str, repository_path: str, ssc: int, lock_file: bool = False,
object_id: bytearray = g.SD_CARD_HANDLER_ID):
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack.append(lock_file)
return PusTelecommand(service=23, subservice=131, ssc=ssc, app_data=data_to_pack)
def generate_read_file_srv23_140_packet(
repository_path: str, filename: str, ssc: int,
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates the application data field for a PUS packet with service
23 and subservie 140. Subservice 140 is a custom service to request the data of a file.
@param repository_path: The path of the target file
@param filename: Name of file from which the content shall be read
@param ssc: source sequence count
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return: The application data field of the (23,129) PUS packet
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=140, ssc=ssc, app_data=data_to_pack)
def generate_lock_file_srv23_5_6_packet(ssc: int, lock: bool, repository_path: str,
filename: str, object_id: bytearray = g.SD_CARD_HANDLER_ID):
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
if lock:
return PusTelecommand(service=23, subservice=5, ssc=ssc, app_data=data_to_pack)
else:
return PusTelecommand(service=23, subservice=6, ssc=ssc, app_data=data_to_pack)
def pack_service23_commands_into(tc_queue: TcQueueT, op_code: int) -> Deque:
# sd_handler_id = g.SD_CARD_HANDLER_ID
if op_code == 0:
generate_generic_service_23_test(tc_queue)
elif op_code == "3" or op_code == 3:
LOGGER.info("Press h in the following input requests")
LOGGER.info("to send a command to display the folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Requesting file attributes"))
command = generate_report_file_attr_srv23_3_packet(
ssc=0, filename=filename, repository_path=repo_path)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "5" or op_code == 5:
LOGGER.info("Press h in the following input requests")
LOGGER.info("to send a command to display the folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.append(("print", "Printing active file system"))
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Locking file"))
command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
filename=filename, lock=True)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "6" or op_code == 6:
LOGGER.info("Press h in the following input requests to send a command to display the"
"folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.append(("print", "Printing active file system"))
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Unlocking file"))
command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
filename=filename, lock=False)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A2":
tc_queue.append(("print", "Printing active file system"))
command = generate_print_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A20":
tc_queue.append(("print", "Clearing active file system"))
command = generate_clear_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A21":
tc_queue.append(("print", "Formatting active file system"))
command = generate_format_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "C0A":
tc_queue.append(("print", "Generating generic folder structure on AT91"))
generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=False)
elif op_code == "C0I":
tc_queue.append(("print", "Generating generic folder structure on iOBC"))
generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=True)
return tc_queue
def generate_generic_service_23_test(tc_queue: TcQueueT):
tc_queue.appendleft(("print", "Testing Service 23"))
tc_queue.append(("print", "Create directory 'test'"))
command = generate_mkdir_srv23_9_packet(directory_name="test", repository_path="/", ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'"))
command = generate_mkdir_srv23_9_packet(repository_path="test", ssc=2301,
directory_name="subdir")
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Create test.bin"))
command = generate_create_file_srv23_1_packet(
filename="test.bin", repository_path="test/subdir", ssc=2302,
initial_data=bytearray([0x01, 0x00, 0x01, 0x00]),
max_size_of_app_data=1024)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Printing file structure."))
command = generate_print_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Clearing SD card"))
command = generate_clear_sd_card_packet(ssc=2301)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Printing file structure"))
command = generate_print_sd_card_packet(ssc=2302)
tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Read data of test.bin"))
# command = generate_read_file_srv23_129_packet("test/subdir", "test.bin", ssc=2303)
# tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'test.bin'"))
command = generate_rm_file_srv23_2_packet(
filename="test.bin", repository_path="test/subdir", ssc=2304)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'subdir' directory"))
command = generate_rmdir_srv23_10_packet(directory_name="subdir", repository_path="test",
ssc=2305)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'test' directory"))
command = generate_rmdir_srv23_10_packet(directory_name="test", repository_path="/",
ssc=2306)
tc_queue.appendleft(command.pack_command_tuple())
def calculate_allowed_file_data_size(max_app_data_size: int, filename: str, repository: str):
# Subtract two because of '\0' terminators
return max_app_data_size - len(filename) - len(repository) - 2
def prompt_for_repo_filename():
input_confirmation = False
repo_path = ""
filename = ""
while not input_confirmation:
repo_path = input("Please type in repository path [c to cancel]: ")
if repo_path == "h":
return "", "h"
filename = input("Please type in filename path [c to cancel]: ")
print("Selection for repostiory path: " + str(repo_path))
print("Selection for filename: " + str(filename))
if repo_path == "c" or filename == "c":
return "", "c"
if repo_path == "h" or filename == "h":
return "", "h"
input_confirmation = input("Confirm selection [y/n]: ")
if input_confirmation in ['y', "yes", 1]:
input_confirmation = True
return repo_path, filename
def pack_generic_file_command_header(object_id: bytearray, first_path: str, second_path: str):
data_to_pack = bytearray(object_id)
data_to_pack += first_path.encode('utf-8')
# Add string terminator of repository paht
data_to_pack.append(0)
data_to_pack += second_path.encode('utf-8')
# Add string terminator of filename
data_to_pack.append(0)
return data_to_pack
......@@ -36,23 +36,45 @@ def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT:
return tc_queue
def pack_service17_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Service 17"))
# ping test
tc_queue.appendleft(("print", "Testing Service 17: Ping Test"))
command = PusTelecommand(service=17, subservice=1, ssc=1700)
tc_queue.appendleft(command.pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 17: Enable Event"))
command = PusTelecommand(service=5, subservice=5, ssc=52)
tc_queue.appendleft(command.pack_command_tuple())
# test event
tc_queue.appendleft(("print", "Testing Service 17: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=1701)
def pack_service17_test_into(tc_queue: TcQueueT, op_code: int = 0) -> TcQueueT:
if op_code == 0:
tc_queue.appendleft(("print", "Testing Service 17"))
# ping test
tc_queue.appendleft(("print", "Testing Service 17: Ping Test"))
command = PusTelecommand(service=17, subservice=1, ssc=1700)
tc_queue.appendleft(command.pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 17: Enable Event"))
command = PusTelecommand(service=5, subservice=5, ssc=52)
tc_queue.appendleft(command.pack_command_tuple())
# test event
tc_queue.appendleft(("print", "Testing Service 17: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=1701)
tc_queue.appendleft(command.pack_command_tuple())
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 17: Invalid subservice"))
command = PusTelecommand(service=17, subservice=243, ssc=1702)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service17.txt"))
elif op_code == 129:
pack_enable_periodic_print_packet(tc_queue, True, 0)
elif op_code == 130:
pack_enable_periodic_print_packet(tc_queue, False, 0)
elif op_code == 150:
pack_trigger_exception_packet(tc_queue, 0)
return tc_queue
def pack_enable_periodic_print_packet(tc_queue: TcQueueT, enable: bool, ssc: int):
tc_queue.appendleft(("print", "Enabling periodic printout"))
if enable:
command = PusTelecommand(service=17, subservice=129, ssc=ssc)
else:
command = PusTelecommand(service=17, subservice=130, ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 17: Invalid subservice"))
command = PusTelecommand(service=17, subservice=243, ssc=1702)
def pack_trigger_exception_packet(tc_queue: TcQueueT, ssc: int):
tc_queue.appendleft(("print", "Triggering software exception"))
command = PusTelecommand(service=17, subservice=150, ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service17.txt"))
return tc_queue
......@@ -5,6 +5,7 @@
@author R. Mueller
@date 01.11.2019
"""
import struct
from typing import Deque
import config.obsw_config as g
......@@ -62,3 +63,14 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) ->
if called_externally is False:
tc_queue.appendleft(("export", "log/tmtc_log_service8.txt"))
return tc_queue
def generate_action_command(object_id: bytearray, action_id: int, data: bytearray = bytearray([]),
ssc: int = 0):
data_to_pack = bytearray(object_id)
data_to_pack += make_action_id(action_id) + data
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=data_to_pack)
def make_action_id(action_id: int) -> bytearray:
return bytearray(struct.pack('!I', action_id))
......@@ -36,4 +36,4 @@ def pack_service9_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(command.pack_command_tuple())
# TODO: Add other time formats here
tc_queue.appendleft(("export", "log/tmtc_log_service9.txt"))
return tc_queue
\ No newline at end of file
return tc_queue
from typing import Union
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT
from tc.obsw_tc_service8 import generate_action_command
from config.obsw_config import LED_TASK_ID
def pack_utility_command(service_queue: TcQueueT, op_code: Union[str, int]):
if op_code == "A0":
service_queue.appendleft(generate_action_command(LED_TASK_ID, 0x00).pack_command_tuple())
elif op_code == "A1":
service_queue.appendleft(generate_action_command(LED_TASK_ID, 0x01).pack_command_tuple())
......@@ -240,10 +240,12 @@ class TestService(unittest.TestCase):
if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
self.scan_for_respective_tc(current_tm_info)
# Here, the desired event Id or RID can be specified
elif current_tm_info[TmDictionaryKeys.SERVICE] == 5:
# TODO: hardcoded values.. should be in config file
if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8300 and
current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700):
if current_tm_info[TmDictionaryKeys.EVENT_ID] == 9700 and \
current_tm_info[TmDictionaryKeys.REPORTER_ID] == \
(struct.unpack('>I', g.PUS_SERVICE_17)[0]):
self.event_counter = self.event_counter + 1
elif current_tm_info[TmDictionaryKeys.SERVICE] == 17:
self.misc_counter = self.misc_counter + 1
......
......@@ -11,7 +11,7 @@ from typing import Deque
from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT
from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \
pack_service2_test_into, pack_service8_test_into, pack_service200_test_into
pack_service2_test_into, pack_service8_test_into, pack_service200_test_into, pack_service20_test_into
import config.obsw_config as g
from tmtc_core.utility.obsw_logger import get_logger
......
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger
from tm.obsw_tm_service_1 import Service1TM
from tm.obsw_tm_service_3 import Service3TM
from tm.obsw_tm_service_5 import Service5TM
import struct
from tm.obsw_tm_service_20 import Service20TM
from tm.obsw_tm_service_23 import Service23TM
LOGGER = get_logger()
......@@ -24,6 +28,8 @@ def tm_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
return Service17TM(raw_tm_packet)
if service_type == 20:
return Service20TM(raw_tm_packet)
if service_type == 23:
return Service23TM(raw_tm_packet)
if service_type == 200:
return Service200TM(raw_tm_packet)
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory")
......@@ -86,23 +92,6 @@ class Service17TM(PusTelemetry):
return
class Service20TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
self.parameter_id = struct.unpack('>I', self._tm_data[0:4])[0]
self.specify_packet_info("Functional Commanding Reply")
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(self.parameter_id)
return
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("param0_dump_repl")
return
class Service200TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
......@@ -141,4 +130,4 @@ class Service200TM(PusTelemetry):
elif self.isModeReply:
array.append("Mode")
array.append("Submode")
return
\ No newline at end of file
return
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT
class Service20TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
datasize = len(self._tm_data)
self.objectId = struct.unpack('>I', self._tm_data[0:4])[0]
self.parameter_id = struct.unpack('>I', self._tm_data[4:8])[0]
if self.get_subservice() == 130:
self.type = struct.unpack('>H', self._tm_data[8:10])[0]
self.type_ptc = self._tm_data[8]
self.type_pfc = self._tm_data[9]
self.column = self._tm_data[10]
self.row = self._tm_data[11]
if self.type_ptc == 3 and self.type_pfc == 14:
self.param = struct.unpack('>I', self._tm_data[12:datasize])[0]
if self.type_ptc == 4 and self.type_pfc == 14:
self.param = struct.unpack('>i', self._tm_data[12:datasize])[0]
if self.type_ptc == 5 and self.type_pfc == 1:
self.param = struct.unpack('>f', self._tm_data[12:datasize])[0]
else:
logger.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \
only known subservice")
self.specify_packet_info("Functional Commanding Reply")
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(hex(self.objectId))
array.append(self.parameter_id)
if self.get_subservice() == 130:
array.append("PTC: " + str(self.type_ptc) + " | PFC: " + str(self.type_pfc))
array.append(str(self.column))
array.append(str(self.row))
if self.type_ptc == 5 and self.type_pfc == 1:
array.append(str(float(self.param)))
else:
array.append(str(hex(self.param)))
return
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("objectID")
array.append("parameterID")
if self.get_subservice() == 130:
array.append("type")
array.append("column")
array.append("row")
array.append("parameter")
return
def pack_tm_information(self) -> PusTmInfoT:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.REPORTER_ID: self.objectId,
TmDictionaryKeys.EVENT_ID: self.parameter_id,
TmDictionaryKeys.EVENT_PARAM_1: self.param
}
tm_information.update(add_information)
return tm_information
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
class Service23TM(PusTelemetry):
MAX_REPOSITORY_LENGTH = 64
MAX_FILENAME_LENGTH = 12
def __init__(self, byte_array):
super().__init__(byte_array)
if len(self.get_tm_data()) < 4:
LOGGER.error("Service23TM: Invalid packet format!")
return
self.object_id = struct.unpack('!I', self._tm_data[0:4])[0]
self.repo_path = ""
self.filename = ""
self.file_size = 0
self.lock_status = False
self.data_start_idx = 0
if self.get_subservice() == 4:
self.unpack_repo_and_filename()
self.unpack_file_attributes()
elif self.get_subservice() == 132:
self.unpack_repo_and_filename()
pass
def unpack_repo_and_filename(self):
repo_path_found = False
path_idx_start = 0
max_len_to_scan = len(self.get_tm_data()) - 4
for idx in range(4, max_len_to_scan):
if not repo_path_found and self._tm_data[idx] == 0:
repo_bytes = self._tm_data[4:idx]
self.repo_path = repo_bytes.decode('utf-8')
path_idx_start = idx + 1
idx += 1
repo_path_found = True
if repo_path_found:
if self._tm_data[idx] == 0:
filename_bytes = self._tm_data[path_idx_start:idx]
self.filename = filename_bytes.decode('utf-8')
self.data_start_idx = idx + 1
break
def unpack_file_attributes(self):
# Size of file length (4) + lock status (1), adapt if more field are added!
print(len(self.get_tm_data()) - self.data_start_idx)
if len(self.get_tm_data()) - self.data_start_idx != 5:
LOGGER.error("Service23TM: Invalid lenght of file attributes data")
return
self.file_size = struct.unpack('!I', self.get_tm_data()[
self.data_start_idx: self.data_start_idx + 4])[0]
self.lock_status = self.get_tm_data()[self.data_start_idx + 4]
def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list)
content_list.append(hex(self.object_id))
content_list.append(self.repo_path)
content_list.append(self.filename)
if self.get_subservice() == 4:
content_list.append(self.file_size)
if self.lock_status == 0:
content_list.append("No")
else:
content_list.append("Yes")
def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list)
header_list.append("Object ID")
header_list.append("Repo Path")
header_list.append("File Name")
if self.get_subservice() == 4:
header_list.append("File Size")
header_list.append("Locked")