diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000000000000000000000000000000000..35352782cb6d6e3d6e999c6ac8ecfadf5e8b0f9e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tmtc_core"] + path = tmtc_core + url = https://git.ksat-stuttgart.de/source/tmtc_core.git diff --git a/comIF/__init__.py b/comIF/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/comIF/obsw_com_config.py b/comIF/obsw_com_config.py deleted file mode 100644 index 4810be9050bb7427b8acab9317a407ed71cae56f..0000000000000000000000000000000000000000 --- a/comIF/obsw_com_config.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Set-up function. Initiates the communication interface. -""" -import sys -from typing import Union - -from comIF.obsw_com_interface import CommunicationInterface -from comIF.obsw_dummy_com_if import DummyComIF -from comIF.obsw_ethernet_com_if import EthernetComIF -from comIF.obsw_serial_com_if import SerialComIF, SerialCommunicationType -from comIF.obsw_qemu_com_if import QEMUComIF - -from utility.obsw_logger import get_logger -from utility.obsw_tmtc_printer import TmTcPrinter - -import config.obsw_config as g - -LOGGER = get_logger() - - -def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[CommunicationInterface, None]: - """ - Return the desired communication interface object - :param tmtc_printer: TmTcPrinter object. - :return: CommunicationInterface object - """ - try: - if g.G_COM_IF == g.ComIF.Ethernet: - communication_interface = EthernetComIF( - tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_SEND_ADDRESS, - receive_address=g.G_REC_ADDRESS) - elif g.G_COM_IF == g.ComIF.Serial: - serial_baudrate = g.G_SERIAL_BAUDRATE - serial_timeout = g.G_SERIAL_TIMEOUT - serial_frame_size = g.G_SERIAL_FRAME_SIZE - communication_interface = SerialComIF( - tmtc_printer=tmtc_printer, com_port=g.G_COM_PORT, baud_rate=serial_baudrate, - serial_timeout=serial_timeout, - ser_com_type=SerialCommunicationType.FIXED_FRAME_BASED, - com_type_args=serial_frame_size) - elif g.G_COM_IF == g.ComIF.QEMU: - communication_interface = QEMUComIF( - tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) - else: - communication_interface = DummyComIF(tmtc_printer=tmtc_printer) - if not communication_interface.valid: - LOGGER.warning("Invalid communication interface!") - return None - return communication_interface - except (IOError, OSError): - LOGGER.error("Error setting up communication interface") - LOGGER.exception("Error") - sys.exit() diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py deleted file mode 100644 index 4567bb2c01e42f24e7da715e7552b6ebff11249e..0000000000000000000000000000000000000000 --- a/comIF/obsw_com_interface.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_com_interface.py -Date: 01.11.2019 -Description: Generic Communication Interface. Defines the syntax of the communication functions. - Abstract methods must be implemented by child class (e.g. Ethernet Com IF) - -@author: R. Mueller -""" -from abc import abstractmethod -from typing import Tuple -from tm.obsw_pus_tm_factory import PusTmListT -from utility.obsw_tmtc_printer import TmTcPrinter -from tc.obsw_pus_tc_base import PusTcInfoT - - -# pylint: disable=useless-return -# pylint: disable=no-self-use -# pylint: disable=unused-argument -class CommunicationInterface: - """ - Generic form of a communication interface to separate communication logic from - the underlying interface. - """ - def __init__(self, tmtc_printer: TmTcPrinter): - self.tmtc_printer = tmtc_printer - self.valid = False - - @abstractmethod - def close(self) -> None: - """ - Closes the ComIF and releases any held resources (for example a Communication Port) - :return: - """ - - @abstractmethod - def send_data(self, data: bytearray): - """ - Send data, for example a frame containing packets. - """ - - @abstractmethod - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: - """ - Send telecommands - :param tc_packet: TC wiretapping_packet to send - :param tc_packet_info: TC wiretapping_packet information - :return: None for now - """ - - @abstractmethod - def receive_telemetry(self, parameters: any = 0) -> PusTmListT: - """ - Returns a list of packets. Most of the time, - this will simply call the pollInterface function - :param parameters: - :return: - """ - packet_list = [] - return packet_list - - @abstractmethod - def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: - """ - Poll the interface and return a list of received packets - :param parameters: - :return: Tuple: boolean which specifies wheather a wiretapping_packet was received, - and the wiretapping_packet list containing - Tm packets - """ - - @abstractmethod - def data_available(self, parameters: any) -> bool: - """ - Check whether TM data is available - :param parameters: - :return: - """ diff --git a/comIF/obsw_dummy_com_if.py b/comIF/obsw_dummy_com_if.py deleted file mode 100644 index 03be5f9a743d15a80a1d51aeb7bbcd34fd1a71ea..0000000000000000000000000000000000000000 --- a/comIF/obsw_dummy_com_if.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -@file obsw_dummy_com_if.py -@date 09.03.2020 -@brief Dummy Communication Interface - -@author R. Mueller -""" -from typing import Tuple - -from comIF.obsw_com_interface import CommunicationInterface -from tc.obsw_pus_tc_base import PusTelecommand, PusTcInfoT, TcDictionaryKeys -from tm.obsw_pus_tm_factory import PusTelemetryFactory -from tm.obsw_tm_service_1 import Service1TmPacked -from utility.obsw_logger import get_logger - -LOGGER = get_logger() - - -class DummyComIF(CommunicationInterface): - def __init__(self, tmtc_printer): - super().__init__(tmtc_printer) - self.service_sent = 0 - self.reply_pending = False - self.ssc = 0 - self.tc_ssc = 0 - self.tc_packet_id = 0 - - def close(self) -> None: - pass - - def data_available(self, parameters): - if self.reply_pending: - return True - return False - - def poll_interface(self, parameters: any = 0) -> Tuple[bool, list]: - pass - - def receive_telemetry(self, parameters: any = 0): - tm_list = [] - if (self.service_sent == 17 or self.service_sent == 5) and self.reply_pending: - LOGGER.debug("receive crap called") - tm_packer = Service1TmPacked(subservice=1, ssc=self.ssc, tc_packet_id=self.tc_packet_id, - tc_ssc=self.tc_ssc) - - tm_packet_raw = tm_packer.pack() - tm_packet = PusTelemetryFactory.create(tm_packet_raw) - tm_list.append(tm_packet) - tm_packer = Service1TmPacked(subservice=7, ssc=self.ssc, tc_packet_id=self.tc_packet_id, - tc_ssc=self.tc_ssc) - tm_packet_raw = tm_packer.pack() - tm_packet = PusTelemetryFactory.create(tm_packet_raw) - tm_list.append(tm_packet) - self.reply_pending = False - self.ssc += 1 - return tm_list - - def send_telecommand(self, tc_packet: PusTelecommand, tc_packet_info: PusTcInfoT = None) -> None: - if isinstance(tc_packet_info, dict) and tc_packet_info.__len__() > 0: - self.service_sent = tc_packet_info[TcDictionaryKeys.SERVICE] - self.tc_packet_id = tc_packet_info[TcDictionaryKeys.PACKET_ID] - self.tc_ssc = tc_packet_info[TcDictionaryKeys.SSC] - self.reply_pending = True diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py deleted file mode 100644 index ae7efef1fd79f7bb6d694dd5138c1b7774090d89..0000000000000000000000000000000000000000 --- a/comIF/obsw_ethernet_com_if.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -@file obsw_ethernet_com_if.py -@date 01.11.2019 -@brief Ethernet Communication Interface - -@author R. Mueller -""" -import select -import socket -import sys -from typing import Tuple - -from comIF.obsw_com_interface import CommunicationInterface, PusTmListT -from tm.obsw_pus_tm_factory import PusTelemetryFactory -from tc.obsw_pus_tc_base import PusTcInfoT -from utility.obsw_tmtc_printer import TmTcPrinter -from config.obsw_definitions import ethernetAddressT - - -# pylint: disable=abstract-method -# pylint: disable=arguments-differ -# pylint: disable=too-many-arguments -class EthernetComIF(CommunicationInterface): - """ - Communication interface for UDP communication. - """ - - def send_data(self, data: bytearray): - self.udp_socket.sendto(data, self.destination_address) - - def __init__(self, tmtc_printer: TmTcPrinter, tm_timeout: float, tc_timeout_factor: float, - receive_address: ethernetAddressT, send_address: ethernetAddressT): - super().__init__(tmtc_printer) - self.tm_timeout = tm_timeout - self.tc_timeout_factor = tc_timeout_factor - self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.receive_address = receive_address - self.destination_address = send_address - self.set_up_socket(self.receive_address) - - def close(self) -> None: - pass - - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: - self.udp_socket.sendto(tc_packet, self.destination_address) - - def data_available(self, timeout: float = 0) -> bool: - ready = select.select([self.udp_socket], [], [], timeout) - if ready[0]: - return True - return False - - def poll_interface(self, poll_timeout: float = 0) -> Tuple[bool, PusTmListT]: - ready = self.data_available(poll_timeout) - if ready: - data = self.udp_socket.recvfrom(1024)[0] - tm_packet = PusTelemetryFactory.create(bytearray(data)) - self.tmtc_printer.print_telemetry(tm_packet) - packet_list = [tm_packet] - return True, packet_list - return False, [] - - def receive_telemetry(self, parameters: any = 0) -> list: - (packet_received, packet_list) = self.poll_interface() - if packet_received: - return packet_list - return [] - - def set_up_socket(self, receive_address): - """ - Sets up the sockets for the UDP communication. - :return: - """ - try: - self.udp_socket.bind(receive_address) - self.udp_socket.setblocking(False) - except OSError: - print("Socket already set-up.") - except TypeError: - print("Invalid Receive Address") - sys.exit() - - def connect_to_board(self): - """ - For UDP, this can be used to initiate communication. - :return: - """ - ping = bytearray([]) - self.send_telecommand(ping) diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py deleted file mode 100755 index d23d4bcfbc88d67865bb05d9ff0944431d120bd7..0000000000000000000000000000000000000000 --- a/comIF/obsw_qemu_com_if.py +++ /dev/null @@ -1,505 +0,0 @@ -#!/usr/bin/env python -""" -QEMU Communication Interface to communicate with emulated QEMU hardware via the UART interface. - -It utilizes the the asyncio library. - -Requirements: - Python >= 3.7 (asyncio support) - -Instructions: - Run QEMU (modified for OBSW) via - - qemu-system-arm -M isis-obc -monitor stdio \ - -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin \ - -qmp unix:/tmp/qemu,server -S - - Then run the telecommand script with -c 2 -""" - -import asyncio -import struct -import json -import re -import errno -import sys -import time -from threading import Thread -from typing import Tuple -from logging import getLogger -from comIF.obsw_com_interface import CommunicationInterface, PusTcInfoT, PusTmListT -from tm.obsw_pus_tm_factory import PusTelemetryFactory -from comIF.obsw_serial_com_if import SerialComIF - -LOGGER = getLogger() -SERIAL_FRAME_LENGTH = 256 - -# Paths to Unix Domain Sockets used by the emulator -QEMU_ADDR_QMP = "/tmp/qemu" -QEMU_ADDR_AT91_USART0 = "/tmp/qemu_at91_usart0" -QEMU_ADDR_AT91_USART2 = "/tmp/qemu_at91_usart2" - -# Request/response category and command IDs -IOX_CAT_DATA = 0x01 -IOX_CAT_FAULT = 0x02 - -IOX_CID_DATA_IN = 0x01 -IOX_CID_DATA_OUT = 0x02 - -IOX_CID_FAULT_OVRE = 0x01 -IOX_CID_FAULT_FRAME = 0x02 -IOX_CID_FAULT_PARE = 0x03 -IOX_CID_FAULT_TIMEOUT = 0x04 - - -def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: - asyncio.set_event_loop(loop) - loop.run_forever() - - -class QEMUComIF(CommunicationInterface): - """ - Specific Communication Interface implementation of the QEMU USART protocol for the TMTC software - """ - def __init__(self, tmtc_printer, tm_timeout, tc_timeout_factor): - super().__init__(tmtc_printer) - self.tm_timeout = tm_timeout - self.tc_timeout_factor = tc_timeout_factor - self.loop = asyncio.get_event_loop() - self.number_of_packets = 0 - self.data = [] - if not self.loop.is_running(): - self.thread = Thread(target=start_background_loop, - args=(self.loop,), daemon=True) - self.thread.start() - try: - self.usart = asyncio.run_coroutine_threadsafe( - Usart.create_async(QEMU_ADDR_AT91_USART0), self.loop).result() - asyncio.run_coroutine_threadsafe(self.usart.open(), self.loop).result() - except NotImplementedError: - LOGGER.exception("QEMU Initialization error, file does not exist!") - sys.exit() - - def close(self): - self.usart.close() - - def send_data(self, data: bytearray): - asyncio.run_coroutine_threadsafe( - self.send_telecommand_async(data), self.loop).result() - - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: - asyncio.run_coroutine_threadsafe( - self.send_telecommand_async(tc_packet), self.loop).result() - - async def send_telecommand_async(self, tc_packet): - await self.usart.write(tc_packet) - self.usart.inject_timeout_error() - - def receive_telemetry(self, parameters=0): - (packet_received, packet_list) = self.poll_interface() - return packet_list - - def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: - packet_list = [] - if self.data_available(): - pus_data_list = self.poll_pus_packets() - for pus_packet in pus_data_list: - packet = PusTelemetryFactory.create(pus_packet) - packet_list.append(packet) - return True, packet_list - return False, packet_list - - def data_available(self, timeout: any = 0) -> bool: - if self.usart.new_data_available(): - return True - if timeout > 0: - start_time = time.time() - elapsed_time = 0 - while elapsed_time < timeout: - if self.usart.new_data_available(): - return True - elapsed_time = time.time() - start_time - return False - - def poll_pus_packets(self): - packets = self.poll_pus_packets_async() - return packets - - def poll_pus_packets_async(self): - self.data = self.usart.read(SERIAL_FRAME_LENGTH) - return SerialComIF.poll_pus_packets_fixed_frames(self.data) - - # todo: use function from serial com IF to deducde redundant code. - # def poll_pus_packets_async(self): - # pus_data_list = [] - # self.data = self.usart.read(256) - # payload_length = (self.data[4] << 8 | self.data[5]) - # packet_size = payload_length + 7 - # if payload_length == 0: - # return [], 0 - # read_size = len(self.data) - # self.number_of_packets = 1 - # pus_data = self.data[0:packet_size] - # pus_data_list.append(pus_data) - # self.__handle_multiple_packets(packet_size, read_size, pus_data_list) - # return pus_data_list, self.number_of_packets - # - # def __handle_multiple_packets(self, packet_size, read_size, pus_data_list): - # end_of_buffer = read_size - 1 - # end_index = packet_size - # while end_index < end_of_buffer: - # end_index = self.__parse_next_packets(end_index, pus_data_list) - # - # def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int: - # start_index = end_index - # end_index = end_index + 5 - # next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index]) - # next_packet_size = next_payload_len + 7 - # if next_packet_size > 256: - # print("PUS Polling: Very large packet detected, " - # "large packet reading not implemented yet !") - # print("Detected Size: " + str(next_packet_size)) - # return end_index - # if next_payload_len == 0: - # end_index = 256 - # return end_index - # end_index = start_index + next_packet_size - # pus_data = self.data[start_index:end_index] - # pus_data_list.append(pus_data) - # self.number_of_packets = self.number_of_packets + 1 - # return end_index - - -class QmpException(Exception): - """An exception caused by the QML/QEMU as response to a failed command""" - - def __init__(self, ret, *args, **kwargs): - Exception.__init__(self, f"QMP error: {ret}") - self.ret = ret # the 'return' structure provided by QEMU/QML - - -class QmpConnection: - """A connection to a QEMU machine via QMP""" - def __init__(self, addr=QEMU_ADDR_QMP): - self.transport = None - self.addr = addr - self.dataq = asyncio.Queue() - self.initq = asyncio.Queue() - self.proto = None - - def _protocol(self): - """The underlying transport protocol""" - - if self.proto is None: - self.proto = QmpProtocol(self) - - return self.proto - - async def _wait_check_return(self): - """ - Wait for the status return of a command and raise an exception if it - indicates a failure - """ - - resp = await self.dataq.get() - if resp["return"]: - raise QmpException(resp["return"]) - - async def open(self): - """ - Open this connection. Connect to the machine ensure that the - connection is ready to use after this call. - """ - - loop = asyncio.get_event_loop() - await loop.create_unix_connection(self._protocol, self.addr) - - # wait for initial capabilities and version - init = await self.initq.get() - print(init) - - # negotioate capabilities - cmd = '{ "execute": "qmp_capabilities" }' - self.transport.write(bytes(cmd, "utf-8")) - await self._wait_check_return() - - return self - - def close(self): - """Close this connection""" - - if self.transport is not None: - self.transport.close() - self.transport = None - self.proto = None - - async def __aenter__(self): - await self.open() - return self - - async def __aexit__(self, exc_type, exc, tb): - self.close() - - async def cont(self): - """Continue machine execution if it has been paused""" - - cmd = '{ "execute": "cont" }' - self.transport.write(bytes(cmd, "utf-8")) - await self._wait_check_return() - - async def stop(self): - """Stop/pause machine execution""" - - cmd = '{ "execute": "stop" }' - self.transport.write(bytes(cmd, "utf-8")) - await self._wait_check_return() - - async def quit(self): - """ - Quit the emulation. This causes the emulator to (non-gracefully) - shut down and close. - """ - - cmd = '{ "execute": "quit" }' - self.transport.write(bytes(cmd, "utf-8")) - await self._wait_check_return() - - -class QmpProtocol(asyncio.Protocol): - """The QMP transport protocoll implementation""" - - def __init__(self, conn): - self.conn = conn - - def connection_made(self, transport): - self.conn.transport = transport - - def connection_lost(self, exc): - self.conn.transport = None - self.conn.proto = None - - def data_received(self, data): - data = str(data, "utf-8") - decoder = json.JSONDecoder() - nows = re.compile(r"[^\s]") - - pos = 0 - while True: - match = nows.search(data, pos) - if not match: - return - - pos = match.start() - obj, pos = decoder.raw_decode(data, pos) - - if "return" in obj: - self.conn.dataq.put_nowait(obj) - elif "QMP" in obj: - self.conn.initq.put_nowait(obj) - elif "event" in obj: - pass - else: - print("qmp:", obj) - - -class DataFrame: - """ - Basic protocol unit for communication via the IOX API introduced for - external device emulation - """ - - def __init__(self, seq, cat, id, data=None): - self.seq = seq - self.cat = cat - self.id = id - self.data = data - - def bytes(self): - """Convert this protocol unit to raw bytes""" - data = self.data if self.data is not None else [] - return bytes([self.seq, self.cat, self.id, len(data)]) + bytes(data) - - def __repr__(self): - return f"{{ seq: 0x{self.seq:02x}, cat: 0x{self.cat:02x}," \ - f" id: 0x{self.id:02x}, data: {self.data} }}" - - -def parse_dataframes(buf): - """Parse a variable number of DataFrames from the given byte buffer""" - - while len(buf) >= 4 and len(buf) >= 4 + buf[3]: - frame = DataFrame(buf[0], buf[1], buf[2], buf[4: 4 + buf[3]]) - buf = buf[4 + buf[3]:] - yield buf, frame - - return buf, None - - -class UsartStatusException(Exception): - """An exception returned by the USART send command""" - - def __init__(self, errn, *args, **kwargs): - Exception.__init__(self, f"USART error: {errno.errorcode[errn]}") - self.errno = errn # a UNIX error code indicating the reason - - -class Usart: - @staticmethod - async def create_async(addr): - return Usart(addr) - """Connection to emulate a USART device for a given QEMU/At91 instance""" - - def __init__(self, addr): - self.addr = addr - self.respd = dict() - self.respc = asyncio.Condition() - self.dataq = asyncio.Queue() - self.datab = bytes() - self.transport = None - self.proto = None - self.seq = 0 - - def _protocol(self): - """The underlying transport protocol""" - - if self.proto is None: - self.proto = UsartProtocol(self) - - return self.proto - - async def open(self): - """Open this connection""" - - loop = asyncio.get_event_loop() - await loop.create_unix_connection(self._protocol, self.addr) - return self - - def close(self): - """Close this connection""" - - if self.transport is not None: - self.transport.close() - self.transport = None - self.proto = None - - async def __aenter__(self): - await self.open() - return self - - async def __aexit__(self, exc_type, exc, tb): - self.close() - - def _send_new_frame(self, cat, cid, data=None): - """ - Send a DataFrame with the given parameters and auto-increase the - sequence counter. Return its sequence number. - """ - self.seq = (self.seq + 1) & 0x7F - - frame = DataFrame(self.seq, cat, cid, data) - self.transport.write(frame.bytes()) - - return frame.seq - - async def write(self, data): - """Write data (bytes) to the USART device""" - - seq = self._send_new_frame(IOX_CAT_DATA, IOX_CID_DATA_IN, data) - - async with self.respc: - while seq not in self.respd.keys(): - await self.respc.wait() - - resp = self.respd[seq] - del self.respd[seq] - - status = struct.unpack("I", resp.data)[0] - if status != 0: - raise UsartStatusException(status) - - async def read_async(self, n, timemout=1): - """Wait for 'n' bytes to be received from the USART - timeout in seconds""" - end_time = time.time()+timemout - - while len(self.datab) < n and time.time() < end_time: - frame = await self.dataq.get() - self.datab += frame.data - - data, self.datab = self.datab[:n], self.datab[n:] - return data - - def read(self, n): - """Wait for 'n' bytes to be received from the USART - timeout in seconds""" - - try: - while len(self.datab) < n: - frame = self.dataq.get_nowait() - self.datab += frame.data - # todo better solution - finally: - data, self.datab = self.datab[:n], self.datab[n:] - return data - - def new_data_available(self) -> bool: - return not self.dataq.empty() - - def flush(self): - while True: - try: - self.dataq.get_nowait() - except Exception as error: - print(error) - return - - def inject_overrun_error(self): - """Inject an overrun error (set CSR_OVRE)""" - self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_OVRE) - - def inject_frame_error(self): - """Inject a frame error (set CSR_FRAME)""" - self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_FRAME) - - def inject_parity_error(self): - """Inject a parity error (set CSR_PARE)""" - self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_PARE) - - def inject_timeout_error(self): - """Inject a timeout (set CSR_TIMEOUT)""" - self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_TIMEOUT) - - -class UsartProtocol(asyncio.Protocol): - """The USART transport protocoll implementation""" - - def __init__(self, conn): - self.conn = conn - self.buf = bytes() - - def connection_made(self, transport): - self.conn.transport = transport - - def connection_lost(self, exc): - self.conn.transport = None - self.conn.proto = None - - def data_received(self, data): - self.buf += data - - for buf, frame in parse_dataframes(self.buf): - self.buf = buf - - if frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_OUT: - # data from CPU/board to device - self.conn.dataq.put_nowait(frame) - elif frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_IN: - # response for data from device to CPU/board - loop = asyncio.get_event_loop() - loop.create_task(self._data_response_received(frame)) - - async def _data_response_received(self, frame): - async with self.conn.respc: - self.conn.respd[frame.seq] = frame - self.conn.respc.notify_all() - diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py deleted file mode 100644 index ccecaa04ed8f627b5b6f967f839aca637797f0ce..0000000000000000000000000000000000000000 --- a/comIF/obsw_serial_com_if.py +++ /dev/null @@ -1,180 +0,0 @@ -""" -@file obsw_serial_com_if.py -@brief Serial Communication Interface -@author R. Mueller -@date 01.11.2019 -""" -import time -import logging -from typing import Tuple -from enum import Enum - -import serial -import serial.tools.list_ports - -from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinter -from tm.obsw_pus_tm_factory import PusTelemetryFactory, PusTmListT -from tc.obsw_pus_tc_base import PusTcInfoT -from utility.obsw_logger import get_logger - - -LOGGER = get_logger() -SERIAL_FRAME_LENGTH = 256 -HEADER_BYTES_BEFORE_SIZE = 5 - - -class SerialCommunicationType(Enum): - TIMEOUT_BASED = 0 - FIXED_FRAME_BASED = 1 - DLE_ENCODING = 2 - - -# pylint: disable=arguments-differ -class SerialComIF(CommunicationInterface): - """ - Communication Interface to use serial communication. This requires the PySerial library. - """ - def __init__(self, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int, - serial_timeout: float, - ser_com_type: SerialCommunicationType = SerialCommunicationType.FIXED_FRAME_BASED, - com_type_args: any = 0): - """ - Initiaze a serial communication handler. - :param tmtc_printer: TMTC printer object. Can be used for diagnostic purposes, but main - packet handling should be done by a separate thread. - :param com_port: Specify COM port. - :param baud_rate: Specify baud rate - :param serial_timeout: Specify serial timeout - :param ser_com_type: Specify how to handle serial reception - """ - super().__init__(tmtc_printer) - if com_port is None: - com_port = self.prompt_com_port() - self.valid = False - try: - self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout) - self.valid = True - except serial.SerialException: - LOGGER.error("Serial Port opening failure!") - - self.data = bytearray() - self.ser_com_type = ser_com_type - if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED: - self.serial_frame_size = com_type_args - elif self.ser_com_type == SerialCommunicationType.DLE_ENCODING: - # todo: We need a separate thread which does nothing but poll the serial interface for - # data (with the option to disable listening temporarily) and writes the data - # into a ring buffer (Python: deque with maxlen). We also need a lock because 2 - # threads use the deque - pass - - def prompt_com_port(self): - com_port = input( - "Serial Commuinication specified without COM port. Please enter COM Port" - "(enter h to display list of COM ports): ") - if com_port == 'h': - ports = serial.tools.list_ports.comports() - for port, desc, hwid in sorted(ports): - print("{}: {} [{}]".format(port, desc, hwid)) - com_port = self.prompt_com_port() - return com_port - - def close(self): - try: - self.serial.close() - except serial.SerialException as e: - logging.exception("Serial Port could not be closed! Traceback: " + str(e)) - - def send_data(self, data: bytearray): - self.serial.write(data) - - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: - self.serial.write(tc_packet) - - def receive_telemetry(self, parameters: any = 0) -> PusTmListT: - (packet_received, packet_list) = self.poll_interface() - if packet_received: - return packet_list - return [] - - def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: - if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED: - if self.data_available(): - self.data = self.serial.read(self.serial_frame_size) - pus_data_list = self.poll_pus_packets_fixed_frames(bytearray(self.data)) - packet_list = [] - for pus_packet in pus_data_list: - packet = PusTelemetryFactory.create(pus_packet) - packet_list.append(packet) - return True, packet_list - return False, [] - elif self.ser_com_type == SerialCommunicationType.DLE_ENCODING: - # todo: the listener thread will fill a deque with a maximum length ( = ring buffer). - # If the number of bytes contained is larger than 0, we have to analyze the - # buffer for STX and ETX chars. We should propably copy all bytes in deque - # into a member list and then start scanning for STX and ETX. The deque should - # be protected with a lock when performing copy operations. - LOGGER.warning("This communication type was not implemented yet!") - else: - LOGGER.warning("This communication type was not implemented yet!") - - def data_available(self, timeout: float = 0) -> bool: - if self.serial.in_waiting > 0: - return True - if timeout > 0: - start_time = time.time() - elapsed_time = 0 - while elapsed_time < timeout: - if self.serial.in_waiting > 0: - return True - elapsed_time = time.time() - start_time - return False - - @staticmethod - def poll_pus_packets_fixed_frames(data: bytearray) -> list: - pus_data_list = [] - if len(data) == 0: - return pus_data_list - - payload_length = data[4] << 8 | data[5] - packet_size = payload_length + 7 - if payload_length == 0: - return [] - read_size = len(data) - pus_data = data[0:packet_size] - pus_data_list.append(pus_data) - - SerialComIF.read_multiple_packets(data, packet_size, read_size, pus_data_list) - return pus_data_list - - @staticmethod - def read_multiple_packets(data: bytearray, start_index: int, frame_size: int, - pus_data_list: list): - while start_index < frame_size: - start_index = SerialComIF.parse_next_packets(data, start_index, frame_size, - pus_data_list) - - @staticmethod - def parse_next_packets(data: bytearray, start_index: int, frame_size: int, - pus_data_list: list) -> int: - next_payload_len = data[start_index + 4] << 8 | data[start_index + 5] - if next_payload_len == 0: - end_index = frame_size - return end_index - next_packet_size = next_payload_len + 7 - # remaining_size = frame_size - start_index - - if next_packet_size > SERIAL_FRAME_LENGTH: - LOGGER.error("PUS Polling: Very large packet detected, " - "packet splitting not implemented yet!") - LOGGER.error("Detected Size: " + str(next_packet_size)) - end_index = frame_size - return end_index - - end_index = start_index + next_packet_size - pus_data = data[start_index:end_index] - pus_data_list.append(pus_data) - return end_index - - diff --git a/gui/obsw_backend_test.py b/gui/obsw_backend_test.py index 9ea2bfbfe8542bee214306aeaf9c2f589a9ceec3..c21caeec5ea9d5dacbc119a6c49e66b27237083a 100644 --- a/gui/obsw_backend_test.py +++ b/gui/obsw_backend_test.py @@ -1,6 +1,6 @@ from multiprocessing.connection import Listener from multiprocessing import Process -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger import logging diff --git a/gui/obsw_tmtc_gui.py b/gui/obsw_tmtc_gui.py index f564b36b95d99931abbbde431a1854695cd4d42a..8c322d5e8e7f54f2ba7a23b5dd9b6209624db546 100644 --- a/gui/obsw_tmtc_gui.py +++ b/gui/obsw_tmtc_gui.py @@ -16,7 +16,7 @@ import tkinter as tk from multiprocessing.connection import Client from multiprocessing import Process -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger import time LOGGER = get_logger() diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 75e7dafc2d23c236189dc51943a9bddc9a18f0ae..ffa9b5307ece5503444bdf988a33120f858d7c23 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -64,20 +64,20 @@ from test import obsw_pus_service_test from config import obsw_config as g from config.obsw_config import set_globals from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker -from tc.obsw_pus_tc_base import PusTcInfo +from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo from obsw_user_code import command_preparation_hook -from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver -from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver -from sendreceive.obsw_tm_listener import TmListener +from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver +from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver +from tmtc_core.sendreceive.obsw_tm_listener import TmListener from utility.obsw_args_parser import parse_input_arguments from utility.obsw_tmtc_printer import TmTcPrinter -from utility.obsw_exit_handler import keyboard_interrupt_handler -from utility.obsw_logger import set_tmtc_logger, get_logger +from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler +from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger from utility.obsw_binary_uploader import perform_binary_upload -from comIF.obsw_com_config import set_communication_interface +from tmtc_core.comIF.obsw_com_config import set_communication_interface from gui.obsw_tmtc_gui import TmTcGUI from gui.obsw_backend_test import TmTcBackend diff --git a/obsw_user_code.py b/obsw_user_code.py index 6b6f6639c3e2014a54e00669714820ea2d2b515d..317841115461352340d6cfcc04e672d6e14c6327 100644 --- a/obsw_user_code.py +++ b/obsw_user_code.py @@ -1,9 +1,8 @@ """ User defined code can be added here. """ -import socket from typing import Union, Tuple -from tc.obsw_pus_tc_base import PusTcInfo +from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo from enum import Enum @@ -38,7 +37,7 @@ def global_setup_hook(): def prepare_robins_commands(): - from tc.obsw_pus_tc_base import PusTelecommand + from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand command = PusTelecommand(service=17, subservice=1, ssc=20) return command.pack_command_tuple() diff --git a/sendreceive/__init__.py b/sendreceive/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py deleted file mode 100644 index d91a0f7a31815253dfa4c67a61345abf7aedf25a..0000000000000000000000000000000000000000 --- a/sendreceive/obsw_command_sender_receiver.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: All functions related to TmTc Sending and Receiving, used by UDP client - -Manual: -Set up the UDP client as specified in the header comment and use the unit testing mode - -A separate thread is used to listen for replies and send a new telecommand -if the first reply has not been received. - -@author: R. Mueller -""" -import time - -import config.obsw_config as g -from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinter -from utility.obsw_logger import get_logger - -from sendreceive.obsw_tm_listener import TmListener -from tc.obsw_pus_tc_base import TcQueueEntryT -from tm.obsw_pus_tm_factory import PusTmQueueT - - -logger = get_logger() - - -# pylint: disable=too-many-instance-attributes -class CommandSenderReceiver: - """ - This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, - for example specific implementations (e.g. SingleCommandSenderReceiver) - """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, - tm_listener: TmListener): - """ - :param com_interface: CommunicationInterface object. Instantiate the desired one - and pass it here - :param tmtc_printer: TmTcPrinter object. Instantiate it and pass it here. - """ - self._tm_timeout = g.G_TM_TIMEOUT - self._tc_send_timeout_factor = g.G_TC_SEND_TIMEOUT_FACTOR - - if isinstance(com_interface, CommunicationInterface): - self._com_interface = com_interface - else: - logger.error("CommandSenderReceiver: Invalid communication interface!") - raise TypeError("CommandSenderReceiver: Invalid communication interface!") - - if isinstance(tmtc_printer, TmTcPrinter): - self._tmtc_printer = tmtc_printer - else: - logger.error("CommandSenderReceiver: Invalid TMTC printer!") - raise TypeError("CommandSenderReceiver: Invalid TMTC printer!") - - if isinstance(tm_listener, TmListener): - self._tm_listener = tm_listener - else: - logger.error("CommandSenderReceiver: Invalid TM listener!") - raise TypeError("Invalid TM Listener!") - - self._start_time = 0 - self._elapsed_time = 0 - self._timeout_counter = 0 - - # needed to store last actual TC packet from queue - self._last_tc = bytearray() - self._last_tc_info = dict() - - # this flag can be used to notify when the operation is finished - self._operation_pending = False - # This flag can be used to notify when a reply was received. - self._reply_received = False - - def set_tm_timeout(self, tm_timeout: float = g.G_TM_TIMEOUT): - """ - Set the TM timeout. Usually, the global value set by the args parser is set, - but the TM timeout can be reset (e.g. for slower architectures) - :param tm_timeout: New TM timeout value as a float value in seconds - :return: - """ - self._tm_timeout = tm_timeout - - def set_tc_send_timeout_factor(self, new_factor: float = g.G_TC_SEND_TIMEOUT_FACTOR): - """ - Set the TC resend timeout factor. After self._tm_timeout * new_factor seconds, - a telecommand will be resent again. - :param new_factor: Factor as a float number - :return: - """ - self._tc_send_timeout_factor = new_factor - - def _check_for_first_reply(self) -> None: - """ - Checks for replies. If no reply is received, send telecommand again in checkForTimeout() - :return: None - """ - if self._tm_listener.event_reply_received.is_set(): - self._reply_received = True - self._operation_pending = False - self._tm_listener.event_reply_received.clear() - else: - self._check_for_timeout() - - def check_queue_entry(self, tc_queue_entry: TcQueueEntryT) -> bool: - """ - Checks whether the entry in the tc queue is a telecommand. - The last telecommand and respective information are stored in _last_tc - and _last_tc_info - :param tc_queue_entry: - :return: True if queue entry is telecommand, False if it is not - """ - queue_entry_first, queue_entry_second = tc_queue_entry - queue_entry_is_telecommand = False - if queue_entry_first == "wait": - wait_time = queue_entry_second - self._tm_timeout = self._tm_timeout + wait_time - time.sleep(wait_time) - # printout optimized for logger and debugging - elif queue_entry_first == "print": - print_string = queue_entry_second - print() - self._tmtc_printer.print_string(print_string, True) - elif queue_entry_first == "rawprint": - print_string = queue_entry_second - self._tmtc_printer.print_string(print_string, False) - elif queue_entry_first == "export": - export_name = queue_entry_second - self._tmtc_printer.add_print_buffer_to_buffer_list() - self._tmtc_printer.print_to_file(export_name, True) - elif queue_entry_first == "timeout": - self._tm_timeout = queue_entry_second - else: - self._last_tc, self._last_tc_info = (queue_entry_first, queue_entry_second) - return True - return queue_entry_is_telecommand - - def _check_for_timeout(self): - """ - Checks whether a timeout after sending a telecommand has occured and sends telecommand - again. If resending reached certain counter, exit the program. - :return: - """ - if self._start_time == 0: - self._start_time = time.time() - if self._timeout_counter == 5: - logger.info("CommandSenderReceiver: No response from command !") - self._operation_pending = False - if self._start_time != 0: - self._elapsed_time = time.time() - self._start_time - if self._elapsed_time >= self._tm_timeout * self._tc_send_timeout_factor: - if g.G_RESEND_TC: - logger.info("CommandSenderReceiver: Timeout, sending TC again !") - self._com_interface.send_telecommand(self._last_tc, self._last_tc_info) - self._timeout_counter = self._timeout_counter + 1 - self._start_time = time.time() - else: - # todo: we could also stop sending and clear the TC queue - self._reply_received = True - time.sleep(0.5) - - def print_tm_queue(self, tm_queue: PusTmQueueT): - for tm_packet_list in tm_queue: - try: - for tm_packet in tm_packet_list: - self._tmtc_printer.print_telemetry(tm_packet) - except AttributeError as e: - logger.exception( - "CommandSenderReceiver Exception: Invalid queue entry. Traceback:", e) diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py deleted file mode 100644 index 304679fcedc64abbd997bd2ccdde56198a51886f..0000000000000000000000000000000000000000 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ /dev/null @@ -1,122 +0,0 @@ -""" -@brief Used to send multiple TCs as bursts and listen for replies simultaneously. - Used by Module Tester -""" -import sys -import time -from typing import Union, Deque -from collections import deque - -from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver -from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinter -from sendreceive.obsw_tm_listener import TmListener -import config.obsw_config as g -from utility.obsw_tmtc_printer import get_logger - -LOGGER = get_logger() - - -class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): - """ - Difference to seqential sender: This class can send TCs in bursts. - Wait intervals can be specified with wait time between the send bursts. - This is generally done in the separate test classes in UnitTest - """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, - tc_queue: Deque, tm_listener: TmListener, wait_intervals: list, - wait_time: Union[float, list], print_tm: bool): - """ - TCs are sent in burst when applicable. Wait intervals can be specified by supplying - respective arguments - :param com_interface: - :param tmtc_printer: - :param tc_queue: - :param wait_intervals: List of pause intervals. For example [1,3] means that a wait_time - is applied after - sendinf the first and the third telecommand - :param wait_time: List of wait times or uniform wait time as float - :param print_tm: - """ - super().__init__(com_interface=com_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener, tc_queue=tc_queue) - self.waitIntervals = wait_intervals - self.waitTime = wait_time - self.printTm = print_tm - self.tm_packet_queue = deque() - self.tc_info_queue = deque() - self.pusPacketInfo = [] - self.pusPacket = [] - self.waitCounter = 0 - - def send_tc_queue_and_return_info(self): - try: - self._tm_listener.mode_id = g.ModeList.UnitTest - self._tm_listener.event_mode_change.set() - time.sleep(0.1) - # TC info queue is set in this function - self.__send_all_queue() - self.wait_for_last_replies_listening(self._tm_timeout / 1.4) - # Get a copy of the queue, otherwise we will lose the data. - tm_packet_queue_list = self._tm_listener.retrieve_tm_packet_queue().copy() - if g.G_PRINT_TM: - self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) - self._tm_listener.clear_tm_packet_queue() - self._tm_listener.event_mode_op_finished.set() - if g.G_PRINT_TO_FILE: - self._tmtc_printer.print_to_file() - return self.tc_info_queue, tm_packet_queue_list - except (KeyboardInterrupt, SystemExit): - LOGGER.info("Closing TMTC Client") - sys.exit() - - def __handle_tc_resending(self): - while not self.__all_replies_received: - if self._tc_queue.__len__ == 0: - if self._start_time == 0: - self._start_time = time.time() - self._check_for_timeout() - - def __send_all_queue(self): - while not self._tc_queue.__len__() == 0: - self.__send_and_print_tc() - - def __send_and_print_tc(self): - tc_queue_tuple = self._tc_queue.pop() - if self.check_queue_entry(tc_queue_tuple): - pus_packet, pus_packet_info = tc_queue_tuple - self.tc_info_queue.append(pus_packet_info) - self._com_interface.send_telecommand(pus_packet, pus_packet_info) - self.__handle_waiting() - - def __handle_waiting(self): - self.waitCounter = self.waitCounter + 1 - if self.waitCounter in self.waitIntervals: - if isinstance(self.waitTime, list): - time.sleep(self.waitTime[self.waitIntervals.index(self.waitCounter)]) - else: - time.sleep(self.waitTime) - - def __retrieve_listener_tm_packet_queue(self): - if self._tm_listener.event_reply_received.is_set(): - return self._tm_listener.retrieve_tm_packet_queue() - else: - LOGGER.error("Multiple Command SenderReceiver: Configuration error, " - "reply event not set in TM listener") - - def __clear_listener_tm_info_queue(self): - self._tm_listener.clear_tm_packet_queue() - - @staticmethod - def wait_for_last_replies_listening(wait_time: float): - elapsed_time_seconds = 0 - start_time = time.time() - while elapsed_time_seconds < wait_time: - elapsed_time_seconds = time.time() - start_time - - - - - - - diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py deleted file mode 100644 index 1d107ce7281549071c3078daffc98082debc68b1..0000000000000000000000000000000000000000 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/python3.8 -""" -@file obsw_sequential_sender_receiver.py -@date 01.11.2019 -@brief Used to send multiple TCs in sequence and listen for replies after each sent TC -""" -import sys -import time - -import config.obsw_config as g -from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver -from sendreceive.obsw_tm_listener import TmListener -from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinter -from utility.obsw_logger import get_logger -from tc.obsw_pus_tc_base import TcQueueT - -LOGGER = get_logger() - - -class SequentialCommandSenderReceiver(CommandSenderReceiver): - """ - Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence - """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, - tm_listener: TmListener, tc_queue: TcQueueT): - """ - :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver - :param tm_listener: TmListener object which runs in the background and receives - all Telemetry - :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver - for this time period - """ - super().__init__(com_interface=com_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener) - self._tc_queue = tc_queue - self.__first_reply_received = False - self.__all_replies_received = False - self.__mode_op_finished = False - - def send_queue_tc_and_receive_tm_sequentially(self): - """ - Primary function which is called for sequential transfer. - :return: - """ - self._tm_listener.mode_id = g.ModeList.ServiceTestMode - self._tm_listener.event_mode_change.set() - # tiny delay for tm listener - time.sleep(0.1) - self.__send_and_receive_first_packet() - # this flag is set in the separate thread ! - try: - self.__handle_tc_sending() - except (KeyboardInterrupt, SystemExit): - LOGGER.info("Closing TMTC Client") - sys.exit() - - def __handle_tc_sending(self): - while not self.__all_replies_received: - while not self._tc_queue.__len__() == 0: - self.__perform_next_tc_send() - if self._tc_queue.__len__() == 0: - self._start_time = time.time() - self._check_for_timeout() - if not self.__mode_op_finished: - self._tm_listener.event_mode_op_finished.set() - self.__mode_op_finished = True - LOGGER.info("SequentialSenderReceiver: All replies received!") - - def __perform_next_tc_send(self): - if self._tm_listener.event_reply_received.is_set(): - self._reply_received = True - self._tm_listener.event_reply_received.clear() - # this flag is set in the separate receiver thread too - if self._reply_received: - self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) - self._tm_listener.clear_tm_packet_queue() - self.__send_next_telecommand() - self._reply_received = False - # just calculate elapsed time if start time has already been set (= command has been sent) - else: - self._check_for_timeout() - - def __send_and_receive_first_packet(self): - tc_queue_tuple = self._tc_queue.pop() - if self.check_queue_entry(tc_queue_tuple): - pus_packet, pus_packet_info = tc_queue_tuple - self._com_interface.send_telecommand(pus_packet, pus_packet_info) - else: - self.__send_and_receive_first_packet() - - def __send_next_telecommand(self): - tc_queue_tuple = self._tc_queue.pop() - if self.check_queue_entry(tc_queue_tuple): - self._start_time = time.time() - pus_packet, pus_packet_info = tc_queue_tuple - self._com_interface.send_telecommand(pus_packet, pus_packet_info) - elif self._tc_queue.__len__() == 0: - # Special case: Last queue entry is not a Telecommand - self.__all_replies_received = True - else: - # If the queue entry was not a telecommand, send next telecommand - self.__perform_next_tc_send() diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py deleted file mode 100644 index 88f271648ca176823a7b5d190e410da44068acb9..0000000000000000000000000000000000000000 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python3.8 -""" -@file - obsw_config.py -@date - 01.11.2019 -@brief - Used to send single tcs and listen for replies after that -""" - -from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver -from sendreceive.obsw_tm_listener import TmListener - -from comIF.obsw_com_interface import CommunicationInterface - -from utility.obsw_tmtc_printer import TmTcPrinter -from utility.obsw_logger import get_logger - -from tc.obsw_pus_tc_base import PusTcTupleT -import config.obsw_config as g - - - -logger = get_logger() - -class SingleCommandSenderReceiver(CommandSenderReceiver): - """ - Specific implementation of CommandSenderReceiver to send a single telecommand - This object can be used by instantiating it and calling sendSingleTcAndReceiveTm() - """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, - tm_listener: TmListener): - """ - :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver - :param tm_listener: TmListener object which runs in the background and receives all TM - :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver - """ - super().__init__(com_interface=com_interface, tm_listener=tm_listener, - tmtc_printer=tmtc_printer) - - - def send_single_tc_and_receive_tm(self, pus_packet_tuple: PusTcTupleT): - """ - Send a single telecommand passed to the class and wait for replies - :return: - """ - try: - pus_packet, pus_packet_info = pus_packet_tuple - except TypeError: - logger.error("SingleCommandSenderReceiver: Invalid command input") - return - self._operation_pending = True - self._tm_listener.mode_id = g.ModeList.SingleCommandMode - self._tm_listener.event_mode_change.set() - self._tmtc_printer.print_telecommand(pus_packet, pus_packet_info) - self._com_interface.send_telecommand(pus_packet, pus_packet_info) - self._last_tc = pus_packet - self._last_tc_info = pus_packet_info - while self._operation_pending: - # wait until reply is received - super()._check_for_first_reply() - if self._reply_received: - self._tm_listener.event_mode_op_finished.set() - self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) - logger.info("SingleCommandSenderReceiver: Reply received") - logger.info("Listening for packages ...") diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py deleted file mode 100644 index 882cd3a6a289b1760c1d9fdb804e5fb7777f9654..0000000000000000000000000000000000000000 --- a/sendreceive/obsw_tm_listener.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -@file -obsw_tm_listener.py -@date -01.11.2019 -@brief -Separate class to listen to telecommands. -@author -R. Mueller -""" -import sys -import time -import threading -from collections import deque -from typing import TypeVar -from utility.obsw_logger import get_logger -from comIF.obsw_com_interface import CommunicationInterface -from tm.obsw_pus_tm_factory import PusTmQueueT, PusTmInfoQueueT -import config.obsw_config as g - -LOGGER = get_logger() - - -class TmListener: - MODE_OPERATION_TIMEOUT = 300 - """ - Performs all TM listening operations. - This listener can be used by setting the modeChangeEvent Event with the set() function - and changing the mode to do special mode operations. The mode operation ends as soon - the modeOpFinished Event is set() ! - """ - def __init__(self, com_interface: CommunicationInterface, tm_timeout: float, - tc_timeout_factor: float): - self.tm_timeout = tm_timeout - self.tc_timeout_factor = tc_timeout_factor - self.com_interface = com_interface - # this will be the default mode (listener mode) - self.mode_id = g.ModeList.ListenerMode - # TM Listener operations can be suspended by setting this flag - self.event_listener_active = threading.Event() - self.event_listener_active.set() - # I don't think a listener is useful without the main program, - # so we might just declare it daemonic. - # UPDATE: Right now, the main program is not in a permanent loop and setting the - # thread daemonic will cancel the program. - # Solved for now by setting a permanent loop at the end of the main program - self.listener_thread = threading.Thread(target=self.__perform_operation, daemon=True) - self.lock_listener = threading.Lock() - # This Event is set by sender objects to perform mode operations - self.event_mode_change = threading.Event() - # This Event is set by sender objects if all necessary operations are done - # to transition back to listener mode - self.event_mode_op_finished = threading.Event() - # maybe we will just make the thread daemonic... - # self.terminationEvent = threading.Event() - # This Event is set and cleared by the listener to inform the sender objects - # if a reply has been received - self.event_reply_received = threading.Event() - # Will be filled for the Unit Test - # TODO: Maybe its better to use a normal queue, which (by-design) offers more thread-safety - # Then we also don't need a lock. But I think its okay to treat the tm packet queue - # as a regular data structure for now. - # Then we propably have to pop the queue entries and put them into a list - # in the main program - self.__tm_packet_queue = deque() - - def start(self): - self.listener_thread.start() - - def tm_received(self): - """ - This function is used to check whether any data has been received - """ - if self.__tm_packet_queue.__len__() > 0: - return True - else: - return False - - def retrieve_tm_packet_queue(self) -> PusTmQueueT: - if self.lock_listener.acquire(True, timeout=1): - tm_queue_copy = self.__tm_packet_queue.copy() - else: - tm_queue_copy = self.__tm_packet_queue.copy() - LOGGER.critical("TmListener: Blocked on lock acquisition for longer than 1 second!") - self.lock_listener.release() - return tm_queue_copy - - def clear_tm_packet_queue(self): - self.__tm_packet_queue.clear() - - def __perform_operation(self): - while True: - if self.event_listener_active.is_set(): - self.__default_operation() - else: - time.sleep(1) - - def __default_operation(self): - """ - Core function. Normally, polls all packets - """ - self.__perform_core_operation() - if self.event_mode_change.is_set(): - self.event_mode_change.clear() - start_time = time.time() - while not self.event_mode_op_finished.is_set(): - elapsed_time = time.time() - start_time - if elapsed_time < TmListener.MODE_OPERATION_TIMEOUT: - self.__perform_mode_operation() - else: - LOGGER.warning("TmListener: Mode operation timeout occured!") - break - self.event_mode_op_finished.clear() - LOGGER.info("TmListener: Transitioning to listener mode.") - self.mode_id = g.ModeList.ListenerMode - - def __perform_core_operation(self): - packet_list = self.com_interface.receive_telemetry() - if len(packet_list) > 0: - if self.lock_listener.acquire(True, timeout=1): - self.__tm_packet_queue.append(packet_list) - else: - LOGGER.error("TmListener: Blocked on lock acquisition!") - self.lock_listener.release() - - def __perform_mode_operation(self): - """ - By setting the modeChangeEvent with set() and specifying the mode variable, - the TmListener is instructed to perform certain operations. - :return: - """ - # Listener Mode - if self.mode_id == g.ModeList.ListenerMode: - self.event_mode_op_finished.set() - # Single Command Mode - elif self.mode_id == g.ModeList.SingleCommandMode: - # Listen for one reply sequence. - if self.check_for_one_telemetry_sequence(): - # Set reply event, will be cleared by checkForFirstReply() - self.event_reply_received.set() - # Sequential Command Mode - elif self.mode_id == g.ModeList.ServiceTestMode or \ - self.mode_id == g.ModeList.SoftwareTestMode: - if self.check_for_one_telemetry_sequence(): - LOGGER.info("TmListener: Reply sequence received!") - self.event_reply_received.set() - elif self.mode_id == g.ModeList.UnitTest: - # TODO: needs to be reworked. Info queue is stupid. just pop the normal queue and - # pack the information manually - self.__perform_core_operation() - - def check_for_one_telemetry_sequence(self) -> bool: - """ - Receive all telemetry for a specified time period. - This function prints the telemetry sequence but does not return it. - :return: - """ - tm_ready = self.com_interface.data_available(self.tm_timeout * self.tc_timeout_factor) - if tm_ready is False: - return False - elif tm_ready is True: - return self.__read_telemetry_sequence() - else: - LOGGER.error("TmListener: Configuration error in communication interface!") - sys.exit() - - def __read_telemetry_sequence(self): - """ - Thread-safe implementation for reading a telemetry sequence. - """ - if self.lock_listener.acquire(True, timeout=1): - self.__tm_packet_queue.append(self.com_interface.receive_telemetry()) - else: - LOGGER.error("TmListener: Blocked on lock acquisition for longer than 1 second!") - self.lock_listener.release() - start_time = time.time() - elapsed_time = 0 - LOGGER.info("TmListener: Listening for " + str(self.tm_timeout) + " seconds") - while elapsed_time < self.tm_timeout: - tm_ready = self.com_interface.data_available(1.0) - if tm_ready: - if self.lock_listener.acquire(True, timeout=1): - tm_list = self.com_interface.receive_telemetry() - self.__tm_packet_queue.append(tm_list) - else: - LOGGER.critical("TmListener: Blocked on lock acquisition for longer than 1 second!") - self.lock_listener.release() - elapsed_time = time.time() - start_time - # the timeout value can be set by special TC queue entries if wiretapping_packet handling - # takes longer, but it is reset here to the global value - if self.tm_timeout is not g.G_TM_TIMEOUT: - self.tm_timeout = g.G_TM_TIMEOUT - return True - - @staticmethod - def retrieve_info_queue_from_packet_queue( - tm_queue: PusTmQueueT, pus_info_queue_to_fill: PusTmInfoQueueT): - tm_queue_copy = tm_queue.copy() - while tm_queue_copy.__len__() != 0: - pus_packet_list = tm_queue_copy.pop() - for pus_packet in pus_packet_list: - pus_info_queue_to_fill.append(pus_packet.pack_tm_information()) - - - diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py deleted file mode 100644 index bac256cffac9b41db52da4c30fb98044f69a7a40..0000000000000000000000000000000000000000 --- a/tc/obsw_pus_tc_base.py +++ /dev/null @@ -1,215 +0,0 @@ -""" -@file obsw_pus_tc_base.py -@brief This module contains the PUS telemetry class representation to - deserialize raw PUS telemetry packets. -@author R. Mueller -""" -from enum import Enum -from typing import Dict, Union, Tuple, Deque -import crcmod -from utility.obsw_logger import get_logger - -LOGGER = get_logger() - - -class TcDictionaryKeys(Enum): - """ Keys for telecommand dictionary """ - SERVICE = 1 - SUBSERVICE = 2 - SSC = 3 - PACKET_ID = 4 - DATA = 5 - - -PusTcInfo = Dict[TcDictionaryKeys, any] -PusTcInfoT = Union[PusTcInfo, None] -PusTcTupleT = Tuple[bytearray, PusTcInfoT] -TcAuxiliaryTupleT = Tuple[str, any] -TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] -TcQueueT = Deque[TcQueueEntryT] -PusTcInfoQueueT = Deque[PusTcInfoT] - - -# pylint: disable=too-many-instance-attributes -# pylint: disable=too-many-arguments -class PusTelecommand: - """ - @brief Class representation of a PUS telecommand. - @details - It can be used to pack a raw telecommand from - input parameters. The structure of a PUS telecommand is specified in ECSS-E-70-41A on p.42 - and is also shown below (bottom) - """ - HEADER_SIZE = 6 - - def __init__(self, service: int, subservice: int, ssc=0, app_data: bytearray = bytearray([]), - source_id: int = 0, version: int = 0, apid: int = 0x73): - """ - Initiates a telecommand with the given parameters. - """ - packet_type = 1 - data_field_header_flag = 1 - self.packet_id_bytes = [0x0, 0x0] - self.packet_id_bytes[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ - ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) - self.packet_id_bytes[1] = apid & 0xFF - self.packet_id = (self.packet_id_bytes[0] << 8) | self.packet_id_bytes[1] - self.ssc = ssc - self.psc = (ssc & 0x3FFF) | (0xC0 << 8) - self.pus_version_and_ack_byte = 0b0001_1111 - self.service = service - self.subservice = subservice - self.source_id = source_id - self.app_data = app_data - - def __repr__(self): - """ - Returns the representation of a class instance. - TODO: Maybe a custom ID or dict consisting of SSC, Service and Subservice would be better. - """ - return self.ssc - - def __str__(self): - """ - Returns string representation of a class instance. - """ - return "TC[" + str(self.service) + "," + str(self.subservice) + "] " + " with SSC " + \ - str(self.ssc) - - def get_data_length(self) -> int: - """ - Retrieve size of TC packet in bytes. - Formula according to PUS Standard: C = (Number of octets in packet data field) - 1. - The size of the TC packet is the size of the packet secondary header with - source ID + the length of the application data + length of the CRC16 checksum - 1 - """ - try: - data_length = 4 + len(self.app_data) + 1 - return data_length - except TypeError: - LOGGER.error("PusTelecommand: Invalid type of application data!") - return 0 - - def get_total_length(self): - """ - Length of full packet in bytes. - The header length is 6 bytes and the data length + 1 is the size of the data field. - """ - return self.get_data_length() + PusTelecommand.HEADER_SIZE + 1 - - def pack(self) -> bytearray: - """ - Serializes the TC data fields into a bytearray. - """ - data_to_pack = bytearray() - data_to_pack.append(self.packet_id_bytes[0]) - data_to_pack.append(self.packet_id_bytes[1]) - data_to_pack.append((self.psc & 0xFF00) >> 8) - data_to_pack.append(self.psc & 0xFF) - length = self.get_data_length() - data_to_pack.append((length & 0xFF00) >> 8) - data_to_pack.append(length & 0xFF) - data_to_pack.append(self.pus_version_and_ack_byte) - data_to_pack.append(self.service) - data_to_pack.append(self.subservice) - data_to_pack.append(self.source_id) - data_to_pack += self.app_data - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(data_to_pack) - - data_to_pack.append((crc & 0xFF00) >> 8) - data_to_pack.append(crc & 0xFF) - return data_to_pack - - def pack_information(self) -> PusTcInfoT: - """ - Packs TM information into a dictionary. - """ - tc_information = { - TcDictionaryKeys.SERVICE: self.service, - TcDictionaryKeys.SUBSERVICE: self.subservice, - TcDictionaryKeys.SSC: self.ssc, - TcDictionaryKeys.PACKET_ID: self.packet_id, - TcDictionaryKeys.DATA: self.app_data - } - return tc_information - - def pack_command_tuple(self) -> PusTcTupleT: - """ Pack a tuple consisting of the raw packet and an information dictionary """ - command_tuple = (self.pack(), self.pack_information()) - return command_tuple - - def print(self): - """ Print the raw command in a clean format. """ - packet = self.pack() - print("Command in Hexadecimal: [", end="") - for counter in range(len(packet)): - if counter == len(packet) - 1: - print(str(hex(packet[counter])), end="") - else: - print(str(hex(packet[counter])) + ", ", end="") - print("]") - - -def generate_packet_crc(tc_packet: bytearray) -> bytearray: - """ - Takes pusPackets, removes current Packet Error Control, calculates new - CRC16 checksum and adds it as correct Packet Error Control Code. - Reference: ECSS-E70-41A p. 207-212 - """ - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(bytearray(tc_packet[0:len(tc_packet) - 2])) - tc_packet[len(tc_packet) - 2] = (crc & 0xFF00) >> 8 - tc_packet[len(tc_packet) - 1] = crc & 0xFF - return tc_packet - - -def generate_crc(data: bytearray) -> bytearray: - """ - Takes the application data, appends the CRC16 checksum and returns resulting bytearray - """ - data_with_crc = bytearray() - data_with_crc += data - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(data) - data_with_crc.append((crc & 0xFF00) >> 8) - data_with_crc.append(crc & 0xFF) - return data_with_crc - -# pylint: disable=line-too-long - -# Structure of a PUS TC Packet : -# A PUS wiretapping_packet consists of consecutive bits, the allocation and structure is standardised. -# Extended information can be found in ECSS-E-70-41A on p.42 -# The easiest form to send a PUS Packet is in hexadecimal form. -# A two digit hexadecimal number equals one byte, 8 bits or one octet -# o = optional, Srv = Service -# -# The structure is shown as follows for TC[17,1] -# 1. Structure Header -# 2. Structure Subheader -# 3. Component (size in bits) -# 4. Hexadecimal number -# 5. Binary Number -# 6. Decimal Number -# -# -------------------------------------------Packet Header(48)------------------------------------------| Packet | -# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | -# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | -# Number(3) | |Header Flag(1)| | |Count(14)| | | -# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | -# 000 1 1 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | -# 0 | 1 | 1 | 115(ASCII s) | 3 | 25 | 0 | 4 | | -# -# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 -# -# Packet Data Field Structure: -# -# ------------------------------------------------Packet Data Field------------------------------------------------- | -# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | -# CCSDS(1)|TC PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)|Source ID(o)|Spare(o)| (var)|(var)| (16) | -# 0x11 (0x1F) | 0x11 | 0x01 | | | | | 0xA0 | 0xB8 | -# 0 001 1111 |00010001 | 00000001 | | | | | | | -# 0 1 1111 | 17 | 1 | | | | | | | -# -# - The source ID is present as one byte. For now, ground = 0x00. diff --git a/tc/obsw_pus_tc_frame_packer.py b/tc/obsw_pus_tc_frame_packer.py index 6b00fb7d883b3bc5051a5b34481977e3beedf932..71d045feb2fdf3a4565384d88afd0678ee3fa7e7 100644 --- a/tc/obsw_pus_tc_frame_packer.py +++ b/tc/obsw_pus_tc_frame_packer.py @@ -2,11 +2,12 @@ @brief Helper module to pack telecommand frames. """ from typing import List, Tuple -from tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo -from utility.obsw_logger import get_logger +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() + def pack_tc_frame(tc_list: List[PusTelecommand], max_frame_size: int, do_fill_tc_frame: bool = False) -> Tuple[bytearray, int, int]: """ @@ -32,6 +33,7 @@ def pack_tc_frame(tc_list: List[PusTelecommand], max_frame_size: int, current_frame_size = len(frame) return frame, current_frame_size, packed_commands + def fill_tc_frame(tc_frame: bytearray, max_frame_size: int): """ Fill a given TC frame with 0 until it has the specified frame size. @@ -42,6 +44,7 @@ def fill_tc_frame(tc_frame: bytearray, max_frame_size: int): tc_frame.extend(bytearray(remaining_size_to_fill)) return tc_frame + def pack_tc_info(tc_list: List[PusTelecommand]) -> List[PusTcInfo]: tc_info_list = list() for tc in tc_list: diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index b3f3d61883a7a13c89d6eed61e1636fe1ca48160..0bb610d448bc1951910a8d9e74d0ff692938ba34 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -9,7 +9,7 @@ Contains the sevice packet which can pack specific service queues (hardcoded for """ import os -from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT from tc.obsw_tc_service2 import pack_service2_test_into from tc.obsw_tc_service3 import pack_service3_test_into from tc.obsw_tc_service8 import pack_service8_test_into @@ -19,7 +19,7 @@ from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into from tc.obsw_tc_gps import pack_gps_test_into -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger import config.obsw_config as g from collections import deque from typing import Union diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index 670442ff612e49d6c3cfa4f5111588da2a378374..22906fb235d867bc2e09f352b4caf9d10e2380ee 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -8,7 +8,7 @@ import struct import config.obsw_config as g -from tc.obsw_pus_tc_base import PusTelecommand, Deque +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque from tc.obsw_tc_service200 import pack_mode_data diff --git a/tc/obsw_tc_service20.py b/tc/obsw_tc_service20.py index 2d8756960696d35ab68eda4966833241f761311c..1b316d68e38b0f2750857c2e78392f9968dc1d7e 100644 --- a/tc/obsw_tc_service20.py +++ b/tc/obsw_tc_service20.py @@ -9,7 +9,7 @@ import struct from typing import Deque import config.obsw_config as g -from tc.obsw_pus_tc_base import PusTelecommand +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tc.obsw_tc_service200 import pack_mode_data diff --git a/tc/obsw_tc_service200.py b/tc/obsw_tc_service200.py index 6ed06e889b1b8fb9e596470dc0e7577895ee7364..cf7f5a18a88d663955c87b52631c9851180fa65b 100644 --- a/tc/obsw_tc_service200.py +++ b/tc/obsw_tc_service200.py @@ -5,7 +5,7 @@ @author R. Mueller @date 02.05.2020 """ -from tc.obsw_pus_tc_base import PusTelecommand +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tc.obsw_pus_tc_packer import TcQueueT import config.obsw_config as g import struct diff --git a/tc/obsw_tc_service3.py b/tc/obsw_tc_service3.py index f488cada97fcae1e400e5e1675563e75d8069b5c..774ee2f4864d78ce23684b469c7e019a33c61629 100644 --- a/tc/obsw_tc_service3.py +++ b/tc/obsw_tc_service3.py @@ -7,7 +7,7 @@ """ import struct from typing import Deque -from tc.obsw_pus_tc_base import PusTelecommand +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand import config.obsw_config as g # adding custom defintion to hk using test pool variables diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py index 5d1b8189cd88d18d4c73b80922f093696d411440..130bc4996265e30a73c82913165456293dc80e2e 100644 --- a/tc/obsw_tc_service8.py +++ b/tc/obsw_tc_service8.py @@ -8,7 +8,7 @@ from typing import Deque import config.obsw_config as g -from tc.obsw_pus_tc_base import PusTelecommand +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tc.obsw_tc_service200 import pack_mode_data diff --git a/tc/obsw_tc_service9.py b/tc/obsw_tc_service9.py index 287d5a500c8dc90860a1d8b75f163a92dfae65b7..507b57ffbba4723c9661970c4194ed97034a8aee 100644 --- a/tc/obsw_tc_service9.py +++ b/tc/obsw_tc_service9.py @@ -8,7 +8,7 @@ from datetime import datetime from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index ba476d98049c01f97c47945acc4aed5cc8916396..a61bef0436f7fe4d489abb2d230b68eb86648320 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -46,12 +46,12 @@ from typing import Deque from config import obsw_config as g from tc.obsw_pus_tc_packer import pack_dummy_device_test_into -from tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys +from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys from tm.obsw_tm_service_1 import PusPacketInfoService1T -from tm.obsw_pus_tm_base import TmDictionaryKeys +from tmtc_core.tm.obsw_pus_tm_base import TmDictionaryKeys from tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT -from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver -from utility.obsw_logger import get_logger +from tmtc_core.sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver +from tmtc_core.utility.obsw_logger import get_logger TmInfoQueueService1T = Deque[PusPacketInfoService1T] LOGGER = get_logger() diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index f06c3389febc4556f3a6fb1398bccb6adf9388a0..e81310c60daeffdf5ae48a23f8fafe48ad14508f 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -9,11 +9,11 @@ import unittest from typing import Deque from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys -from tc.obsw_pus_tc_base import PusTcInfoQueueT +from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ pack_service2_test_into, pack_service8_test_into, pack_service200_test_into import config.obsw_config as g -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py deleted file mode 100644 index 8be751a5f3f9d87704b1c2cd26c2424961a6388b..0000000000000000000000000000000000000000 --- a/tm/obsw_pus_tm_base.py +++ /dev/null @@ -1,388 +0,0 @@ -import math -import time - -from utility.obsw_logger import get_logger - -import datetime -from enum import Enum, auto -from typing import Final, Dict -from crcmod import crcmod - -logger = get_logger() - - -class TmDictionaryKeys(Enum): - SERVICE = auto() - SUBSERVICE = auto() - SUBCOUNTER = auto() - SSC = auto() - DATA = auto() - CRC = auto() - VALID = auto() - # Service 1 - TC_PACKET_ID = auto() - TC_SSC = auto() - ERROR_CODE = auto() - STEP_NUMBER = auto() - # Service 5 - EVENT_ID = auto() - REPORTER_ID = auto() - EVENT_PARAM_1 = auto() - EVENT_PARAM_2 = auto() - - -PusTmInfoT = Dict[TmDictionaryKeys, any] - - -class PusTelemetry: - """ - Generic PUS telemetry class representation. - It is instantiated by passing the raw pus telemetry packet (bytearray) to the constructor. - It automatically deserializes the packet, exposing various packet fields via getter functions. - PUS Telemetry structure according to ECSS-E-70-41A p.46. Also see structure below (bottom). - """ - PUS_HEADER_SIZE = 6 - CDS_SHORT_SIZE = 7 - PUS_TIMESTAMP_SIZE = CDS_SHORT_SIZE - - def __init__(self, raw_telemetry: bytearray = bytearray()): - if raw_telemetry == bytearray(): - return - self._packet_raw: Final = raw_telemetry - self._pus_header: Final = PusPacketHeader(raw_telemetry) - self._valid = False - if self._pus_header.length + PusTelemetry.PUS_HEADER_SIZE + 1 > len(raw_telemetry): - logger.error("PusTelemetry: Passed packet shorter than specified packet " - "length in PUS header") - return - self._data_field_header: Final = PusPacketDataFieldHeader(raw_telemetry[6:]) - self._tm_data: Final = raw_telemetry[PusPacketDataFieldHeader.DATA_HEADER_SIZE + - PusTelemetry.PUS_HEADER_SIZE + 1: - len(raw_telemetry) - 2] - self._crc: Final = raw_telemetry[len(raw_telemetry) - 2] << 8 | \ - raw_telemetry[len(raw_telemetry) - 1] - self.print_info = "" - self.__perform_crc_check(raw_telemetry) - - def get_service(self): - """ - :return: Service ID - """ - return self._data_field_header.service_type - - def get_subservice(self): - """ - :return: Subservice ID - """ - return self._data_field_header.service_subtype - - def is_valid(self): - return self._valid - - def get_tm_data(self) -> bytearray: - """ - :return: TM application data (raw) - """ - return self._tm_data - - def pack_tm_information(self) -> PusTmInfoT: - """ - Packs important TM information needed for tests in a convenient dictionary - :return: TM dictionary - """ - tm_information = { - TmDictionaryKeys.SERVICE: self.get_service(), - TmDictionaryKeys.SUBSERVICE: self.get_subservice(), - TmDictionaryKeys.SSC: self.get_ssc(), - TmDictionaryKeys.DATA: self._tm_data, - TmDictionaryKeys.CRC: self._crc, - TmDictionaryKeys.VALID: self._valid - } - return tm_information - - def __perform_crc_check(self, raw_telemetry: bytearray): - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - if len(raw_telemetry) < self.get_packet_size(): - logger.warning("PusPacketHeader: Invalid packet length") - return - data_to_check = raw_telemetry[0:self.get_packet_size()] - crc = crc_func(data_to_check) - if crc == 0: - self._valid = True - else: - logger.warning("PusPacketHeader: Invalid CRC detected !") - - def specify_packet_info(self, print_info: str): - """ - Caches a print information string for later printing - :param print_info: - :return: - """ - self.print_info = print_info - - def append_packet_info(self, print_info: str): - """ - Similar to the function above, but appends to the existing information string. - :param print_info: - :return: - """ - self.print_info = self.print_info + print_info - - def append_telemetry_content(self, content_list: list): - """ - Default implementation adds the PUS header content to the list which can then be - printed with a simple print() command. To add additional content, override this method - (don't forget to still call this function with super() if the header is required) - :param content_list: Header content will be appended to this list - :return: - """ - self._data_field_header.append_data_field_header(content_list) - self._pus_header.append_pus_packet_header(content_list) - if self.is_valid(): - content_list.append("Yes") - else: - content_list.append("No") - - def append_telemetry_column_headers(self, header_list: list): - """ - Default implementation adds the PUS header content header (confusing, I know) - to the list which can then be printed with a simple print() command. - To add additional headers, override this method - (don't forget to still call this function with super() if the header is required) - :param header_list: Header content will be appended to this list - :return: - """ - self._data_field_header.append_data_field_header_column_header(header_list) - self._pus_header.append_pus_packet_header_column_headers(header_list) - header_list.append("Packet valid") - - def get_raw_packet(self) -> bytes: - """ - Get the whole TM wiretapping_packet as a bytearray (raw) - :return: TM wiretapping_packet - """ - return self._packet_raw - - def get_packet_size(self) -> int: - """ - :return: Size of the TM wiretapping_packet - """ - # PusHeader Size + _tm_data size - size = PusTelemetry.PUS_HEADER_SIZE + self._pus_header.length + 1 - return size - - def get_ssc(self) -> int: - """ - Get the source sequence count - :return: Source Sequence Count (see below, or PUS documentation) - """ - return self._pus_header.source_sequence_count - - def return_full_packet_string(self): - return self.return_data_string(self._packet_raw, len(self._packet_raw)) - - def print_full_packet_string(self): - """ - Print the full TM packet in a clean format. - """ - logger.info(self.return_data_string(self._packet_raw, len(self._packet_raw))) - - def print_source_data(self): - """ - Prints the TM source data in a clean format - :return: - """ - logger.info(self.return_data_string(self._tm_data, len(self._tm_data))) - - def return_source_data(self): - """ - Returns the source data string - """ - return self.return_data_string(self._tm_data, len(self._tm_data)) - - @staticmethod - def return_data_string(byte_array: bytearray, length: int) -> str: - """ - Returns the TM data in a clean printable string format - Prints payload data in default mode - and prints the whole packet if full_packet = True is passed. - :return: - """ - str_to_print = "[" - for index in range(length): - str_to_print += str(hex(byte_array[index])) + " , " - str_to_print = str_to_print.rstrip() - str_to_print = str_to_print.rstrip(',') - str_to_print = str_to_print.rstrip() - str_to_print += ']' - return str_to_print - - -# pylint: disable=too-many-instance-attributes -class PusPacketHeader: - """ - This class unnpacks the PUS packet header, also see PUS structure below or PUS documentation. - """ - - def __init__(self, pus_packet_raw: bytes): - if len(pus_packet_raw) < PusTelemetry.PUS_HEADER_SIZE: - logger.warning("PusPacketHeader: Packet size smaller than PUS header size!") - self.version = 0 - self.type = 0 - self.data_field_header_flag = 0 - self.apid = 0 - self.segmentation_flag = 0 - self.source_sequence_count = 0 - self.length = 0 - return - self.version = pus_packet_raw[0] >> 5 - self.type = pus_packet_raw[0] & 0x10 - self.data_field_header_flag = (pus_packet_raw[0] & 0x8) >> 3 - self.apid = ((pus_packet_raw[0] & 0x7) << 8) | pus_packet_raw[1] - self.segmentation_flag = (pus_packet_raw[2] & 0xC0) >> 6 - self.source_sequence_count = ((pus_packet_raw[2] & 0x3F) << 8) | pus_packet_raw[3] - self.length = pus_packet_raw[4] << 8 | pus_packet_raw[5] - - def append_pus_packet_header(self, array): - array.append(str(chr(self.apid))) - array.append(str(self.source_sequence_count)) - - @staticmethod - def append_pus_packet_header_column_headers(array): - array.append("APID") - array.append("SSC") - - -class PusPacketDataFieldHeader: - """ - Unpacks the PUS packet data field header. - """ - DATA_HEADER_SIZE = PusTelemetry.PUS_TIMESTAMP_SIZE + 4 - def __init__(self, bytes_array: bytearray): - self.pus_version_and_ack_byte = (bytes_array[0] & 0x70) >> 4 - self.service_type = bytes_array[1] - self.service_subtype = bytes_array[2] - self.subcounter = bytes_array[3] - self.time = PusTelemetryTimestamp(bytes_array[4:13]) - - def append_data_field_header(self, content_list: list): - """ - Append important data field header parameters to the passed content list. - :param content_list: - :return: - """ - content_list.append(str(self.service_type)) - content_list.append(str(self.service_subtype)) - content_list.append(str(self.subcounter)) - self.time.print_time(content_list) - - def append_data_field_header_column_header(self, header_list: list): - """ - Append important data field header column headers to the passed list. - :param header_list: - :return: - """ - header_list.append("Service") - header_list.append("Subservice") - header_list.append("Subcounter") - self.time.print_time_headers(header_list) - - - -class PusTelemetryTimestamp: - """ - Unpacks the time datafield of the TM packet. Right now, CDS Short timeformat is used, - and the size of the time stamp is expected to be seven bytes. - """ - CDS_ID = 4 - SECONDS_PER_DAY = 86400 - EPOCH = datetime.datetime.utcfromtimestamp(0) - DAYS_CCSDS_TO_UNIX = 4383 - TIMESTAMP_SIZE = PusTelemetry.PUS_TIMESTAMP_SIZE - def __init__(self, byte_array: bytearray=bytearray([])): - if len(byte_array) > 0: - # pField = byte_array[0] - self.days = ((byte_array[1] << 8) | (byte_array[2])) - \ - PusTelemetryTimestamp.DAYS_CCSDS_TO_UNIX - self.seconds = self.days * (24 * 60 * 60) - s_day = ((byte_array[3] << 24) | (byte_array[4] << 16) | - (byte_array[5]) << 8 | byte_array[6]) / 1000 - self.seconds += s_day - self.time = self.seconds - self.datetime = str(datetime.datetime. - utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) - - @staticmethod - def pack_current_time() -> bytearray: - """ - Returns a seven byte CDS short timestamp - """ - timestamp = bytearray() - p_field = (PusTelemetryTimestamp.CDS_ID << 4) + 0 - days = (datetime.datetime.utcnow() - PusTelemetryTimestamp.EPOCH).days + \ - PusTelemetryTimestamp.DAYS_CCSDS_TO_UNIX - days_h = (days & 0xFF00) >> 8 - days_l = days & 0xFF - seconds = time.time() - fraction_ms = seconds - math.floor(seconds) - days_ms = int((seconds % PusTelemetryTimestamp.SECONDS_PER_DAY) * 1000 + fraction_ms) - days_ms_hh = (days_ms & 0xFF000000) >> 24 - days_ms_h = (days_ms & 0xFF0000) >> 16 - days_ms_l = (days_ms & 0xFF00) >> 8 - days_ms_ll = (days_ms & 0xFF) - timestamp.append(p_field) - timestamp.append(days_h) - timestamp.append(days_l) - timestamp.append(days_ms_hh) - timestamp.append(days_ms_h) - timestamp.append(days_ms_l) - timestamp.append(days_ms_ll) - return timestamp - - - def print_time(self, array): - array.append(self.time) - array.append(self.datetime) - - @staticmethod - def print_time_headers(array): - array.append("OBSWTime (s)") - array.append("Time") - - -# pylint: disable=line-too-long -# Structure of a PUS Packet : -# A PUS packet consists of consecutive bits, the allocation and structure is standardised. -# Extended information can be found in ECSS-E-70-41A on p.46 -# The easiest form to send a PUS Packet is in hexadecimal form. -# A two digit hexadecimal number equals one byte, 8 bits or one octet -# o = optional, Srv = Service -# -# The structure is shown as follows for TM[17,2] -# 1. Structure Header -# 2. Structure Subheader -# 3. Component (size in bits) -# 4. Hexadecimal number -# 5. Binary Number -# 6. Decimal Number -# -# -------------------------------------------Packet Header(48)------------------------------------------| Packet | -# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | -# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | -# Number(3) | |Header Flag(1)| | |Count(14)| | | -# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | -# 000 1 0 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | -# 0 | 1 | 0 | 115(ASCII s) | 3 | 25 | 0 | 4 | | -# -# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 -# -# Packet Data Field Structure: -# -# ------------------------------------------------Packet Data Field------------------------------------------------- | -# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | -# CCSDS(1)|TM PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)| Time (o) |Spare(o)| (var)|(var)| (16) | -# 0x11 (0x1F) | 0x11 | 0x01 | | | | | Calc. | Calc. | -# 0 001 1111 |00010001 | 00000001 | | | | | | | -# 0 1 1111 | 17 | 2 | | | | | | | -# -# - The source ID is present as one byte. Is it necessary? For now, ground = 0x00. diff --git a/tm/obsw_pus_tm_creator.py b/tm/obsw_pus_tm_creator.py deleted file mode 100644 index 46370f0d4db13dd2ab5ed9f45335847d22bdcc9c..0000000000000000000000000000000000000000 --- a/tm/obsw_pus_tm_creator.py +++ /dev/null @@ -1,103 +0,0 @@ -""" -This module creates the class required to generate PUS telemetry packets. -""" -import crcmod - -from tm.obsw_pus_tm_base import PusTelemetry, PusTelemetryTimestamp -from utility.obsw_logger import get_logger - - -LOGGER = get_logger() - - -# pylint: disable=too-many-instance-attributes -# pylint: disable=too-many-arguments -class PusTelemetryCreator: - """ - Alternative way to create a PUS Telemetry packet by specifying telemetry parameters, - similarly to the way telecommands are created. This can be used to create telemetry - directly in the software. See documentation and more information related to - the ESA PUS standard in the PusTelemetry documentation. - """ - def __init__(self, service: int, subservice: int, ssc: int = 0, - source_data: bytearray = bytearray([]), apid: int = 0x73, version: int = 0): - """ - Initiates the unserialized data fields for the PUS telemetry packet. - """ - # packet type for telemetry is 0 as specified in standard - packet_type = 0 - # specified in standard - data_field_header_flag = 1 - self.packet_id_bytes = [0x0, 0x0] - self.packet_id_bytes[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ - ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) - self.packet_id_bytes[1] = apid & 0xFF - self.packet_id = (self.packet_id_bytes[0] << 8) | self.packet_id_bytes[1] - self.ssc = ssc - self.psc = (ssc & 0x3FFF) | (0xC0 << 8) - self.pus_version_and_ack_byte = 0b00011111 - - # NOTE: In PUS-C, the PUS Version is 2 and specified for the first 4 bits. - # The other 4 bits of the first byte are the spacecraft time reference status - # To change to PUS-C, set 0b00100000 - self.data_field_version = 0b00010000 - self.service = service - self.subservice = subservice - self.pack_subcounter = 0 - # it is assumed the time field consts of 8 bytes. - - self.source_data = source_data - - def print(self): - """ Print the raw command in a clean format. """ - packet = self.pack() - print_out = "Telemetry in Hexadecimal: [" - for counter in range(len(packet)): - if counter == len(packet) - 1: - print_out += str(hex(packet[counter])) - else: - print_out += str(hex(packet[counter])) + ", " - print_out += "]" - LOGGER.info(print_out) - - def pack(self) -> bytearray: - """ - Serializes the PUS telemetry into a raw packet. - """ - tm_packet_raw = bytearray() - # PUS Header - tm_packet_raw.extend(self.packet_id_bytes) - tm_packet_raw.append((self.psc & 0xFF00) >> 8) - tm_packet_raw.append(self.psc & 0xFF) - source_length = self.get_source_data_length() - tm_packet_raw.append((source_length & 0xFF00) >> 8) - tm_packet_raw.append(source_length & 0xFF) - # PUS Source Data Field - tm_packet_raw.append(self.data_field_version) - tm_packet_raw.append(self.service) - tm_packet_raw.append(self.subservice) - tm_packet_raw.append(self.pack_subcounter) - tm_packet_raw.extend(PusTelemetryTimestamp.pack_current_time()) - # Source Data - tm_packet_raw.extend(self.source_data) - # CRC16 checksum - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc16 = crc_func(tm_packet_raw) - tm_packet_raw.append((crc16 & 0xFF00) >> 8) - tm_packet_raw.append(crc16 & 0xFF) - return tm_packet_raw - - def get_source_data_length(self) -> int: - """ - Retrieve size of TM packet data header in bytes. - Formula according to PUS Standard: C = (Number of octets in packet source data field) - 1. - The size of the TM packet is the size of the packet secondary header with - the timestamp + the length of the application data + PUS timestamp size + - length of the CRC16 checksum - 1 - """ - try: - data_length = 4 + PusTelemetry.PUS_TIMESTAMP_SIZE + len(self.source_data) + 1 - return data_length - except TypeError: - LOGGER.error("PusTelecommand: Invalid type of application data!") - return 0 diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index fe6857acd498416d6538690fcc59076be1be1f63..69f837deb5b9a936e053f75da260807caaba7680 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- from typing import Deque, List, Tuple -from tm.obsw_pus_tm_base import PusTelemetry, PusTmInfoT +from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, PusTmInfoT from tm.obsw_tm_service_1 import Service1TM from tm.obsw_tm_service_3 import Service3TM from tm.obsw_tm_service_5 import Service5TM -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger import struct logger = get_logger() diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index 86b132a18f6d39aaab7d2218655ee1222beb0560..b299db9fd3d261aff8218b16d12e75a2fa0e0821 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -8,9 +8,9 @@ Author: R. Mueller import struct from typing import Dict -from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys -from tm.obsw_pus_tm_creator import PusTelemetryCreator -from utility.obsw_logger import get_logger +from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys +from tmtc_core.tm.obsw_pus_tm_creator import PusTelemetryCreator +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() PusPacketInfoService1T = Dict[TmDictionaryKeys, any] diff --git a/tm/obsw_tm_service_3.py b/tm/obsw_tm_service_3.py index b6f8419c6a46bcded7a253759e85ac419860cc5c..8e5fb596c2884d482e1276c03340cec85110fe4d 100644 --- a/tm/obsw_tm_service_3.py +++ b/tm/obsw_tm_service_3.py @@ -6,7 +6,7 @@ Description: Deserialize Housekeeping TM Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry +from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from typing import Type import struct diff --git a/tm/obsw_tm_service_5.py b/tm/obsw_tm_service_5.py index d0a91ee7b4f93d49edcb590f4c9bcf210f8423e2..f096f27f6cf96fea585aa7144a94d311689bc4cb 100644 --- a/tm/obsw_tm_service_5.py +++ b/tm/obsw_tm_service_5.py @@ -6,7 +6,7 @@ Description: Deserialize PUS Event Report Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys +from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys from tm.obsw_pus_tm_factory import PusTmInfoT import struct diff --git a/tmtc_core b/tmtc_core new file mode 160000 index 0000000000000000000000000000000000000000..b099c5bb9d83a6e5ddf481271feb21255d59952d --- /dev/null +++ b/tmtc_core @@ -0,0 +1 @@ +Subproject commit b099c5bb9d83a6e5ddf481271feb21255d59952d diff --git a/utility/dle_encoder.py b/utility/dle_encoder.py deleted file mode 100644 index d7abcabba2693997e4bdbeea25c8a444832b2534..0000000000000000000000000000000000000000 --- a/utility/dle_encoder.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -@brief DLE Encoder Implementation -@details -DLE encoding can be used to provide a simple transport layer for serial data. -A give data stream is encoded by adding a STX char at the beginning and an ETX char at the end. -All STX and ETX occurences in the packet are encoded as well so the receiver can simply look -for STX and ETX occurences to identify packets. -""" - -# TODO: Implementation / Translation of C code diff --git a/utility/hammingcode.py b/utility/hammingcode.py deleted file mode 100644 index c59508eb8d82f330014136d974162128490be9d0..0000000000000000000000000000000000000000 --- a/utility/hammingcode.py +++ /dev/null @@ -1,9 +0,0 @@ -""" -@brief Hamming Code Implementation -@details -Hamming codes belong to the family of linear error correcting codes. -Documentation: https://en.wikipedia.org/wiki/Hamming_code -They can be used to identify up to two bit error and correct on bit error. -""" - -# TODO: Implementation / Translation of C code diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index 67a6fc02ba0eeb5053e73f6426664124436b725e..bef39878d7c7eb07d7c2766e11bb4d11da000167 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -4,7 +4,7 @@ Argument parser module. """ import argparse import sys -from utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/utility/obsw_exit_handler.py b/utility/obsw_exit_handler.py deleted file mode 100644 index 437eef10837c7e56c3e5ea584fb02090109af86e..0000000000000000000000000000000000000000 --- a/utility/obsw_exit_handler.py +++ /dev/null @@ -1,28 +0,0 @@ -import signal -from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_logger import get_logger - -logger = get_logger() - - -def keyboard_interrupt_handler(com_interface: CommunicationInterface): - logger.info("Disconnect registered") - # Unit Test closes Serial Port at the end - # We could do some optional stuff here - if com_interface is not None: - com_interface.send_telecommand(bytearray([0, 0, 0, 0, 0])) - - -class GracefulKiller: - kill_now = False - - def __init__(self): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - - def exit_gracefully(self): - self.kill_now = True - print("I was killed") - - - diff --git a/utility/obsw_logger.py b/utility/obsw_logger.py deleted file mode 100644 index 736a656a80676c0e2d32a7a41da95d6464e2d87c..0000000000000000000000000000000000000000 --- a/utility/obsw_logger.py +++ /dev/null @@ -1,81 +0,0 @@ -""" -@brief This module is used to set up the global loggers -""" -import logging -import os -import sys -import config.obsw_config as g - - -# pylint: disable=arguments-differ -# pylint: disable=too-few-public-methods -class InfoFilter(logging.Filter): - """ - Filter object, which is used so that only INFO and DEBUG messages are printed to stdout. - """ - def filter(self, rec): - if rec.levelno == logging.INFO: - return rec.levelno - return None - - -class DebugFilter(logging.Filter): - """ - Filter object, which is used so that only INFO and DEBUG messages are printed to stdout. - """ - def filter(self, rec): - if rec.levelno == logging.DEBUG: - return rec.levelno - return None - - -def set_tmtc_logger() -> logging.Logger: - """ - Sets the logger object which will be used globally. - """ - logger = logging.getLogger(g.G_TMTC_LOGGER_NAME) - logger.setLevel(level=logging.DEBUG) - generic_format = logging.Formatter( - fmt='%(levelname)-8s: %(asctime)s.%(msecs)03d | %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') - fault_format = logging.Formatter( - fmt='%(levelname)-8s: %(asctime)s.%(msecs)03d | [%(filename)s:%(lineno)d] | %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') - - console_info_handler = logging.StreamHandler(stream=sys.stdout) - console_info_handler.setLevel(logging.INFO) - console_info_handler.addFilter(InfoFilter()) - - console_debug_handler = logging.StreamHandler(stream=sys.stdout) - console_debug_handler.setLevel(logging.DEBUG) - console_debug_handler.addFilter(DebugFilter()) - - console_error_handler = logging.StreamHandler(stream=sys.stderr) - console_error_handler.setLevel(logging.WARNING) - try: - error_file_handler = logging.FileHandler( - filename="log/" + g.G_ERROR_LOG_FILE_NAME, encoding='utf-8', mode='w') - except FileNotFoundError: - os.mkdir("log") - error_file_handler = logging.FileHandler( - filename="log/" + g.G_ERROR_LOG_FILE_NAME, encoding='utf-8', mode='w') - - error_file_handler.setLevel(level=logging.WARNING) - - error_file_handler.setFormatter(generic_format) - console_info_handler.setFormatter(generic_format) - console_debug_handler.setFormatter(fault_format) - console_error_handler.setFormatter(fault_format) - logger.addHandler(error_file_handler) - logger.addHandler(console_info_handler) - logger.addHandler(console_debug_handler) - logger.addHandler(console_error_handler) - return logger - - -def get_logger() -> logging.Logger: - """ - Get the global logger instance. - """ - logger = logging.getLogger(g.G_TMTC_LOGGER_NAME) - return logger diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 24f7703709642429b29ad3d661c1b49764a37815..49590506e711751da851049b14a8d81774b1279a 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -11,11 +11,11 @@ import os import enum from config import obsw_config as g -from tm.obsw_pus_tm_base import PusTelemetry +from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from tm.obsw_pus_tm_factory import PusTmQueueT from tm.obsw_tm_service_3 import Service3TM -from tc.obsw_pus_tc_base import PusTcInfoT, TcDictionaryKeys -from utility.obsw_logger import get_logger +from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoT, TcDictionaryKeys +from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger()