From cec4be40a16144b29fe410014c164490fe427e23 Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Mon, 6 Jul 2020 15:13:23 +0200 Subject: [PATCH] introduced tmtc frame packer --- comIF/obsw_com_interface.py | 6 +++++ comIF/obsw_qemu_com_if.py | 9 ++++--- comIF/obsw_serial_com_if.py | 3 +++ tc/obsw_pus_tc_base.py | 3 ++- tc/obsw_pus_tc_frame_packer.py | 49 ++++++++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 tc/obsw_pus_tc_frame_packer.py diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index 76695e5..75e4bc3 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -32,6 +32,12 @@ class CommunicationInterface: :return: """ + @abstractmethod + def send_data(self, data: bytearray): + """ + Send data, for example a frame containing packets. + """ + @abstractmethod def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: """ diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py index aae10f7..035ec6a 100755 --- a/comIF/obsw_qemu_com_if.py +++ b/comIF/obsw_qemu_com_if.py @@ -74,12 +74,15 @@ class QEMUComIF(CommunicationInterface): def close(self): self.usart.close() + def send_data(self, data: bytearray): + asyncio.run_coroutine_threadsafe( + self.send_telecommand_async(data), self.loop).result() + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: asyncio.run_coroutine_threadsafe( - self.send_telecommand_async(tc_packet, tc_packet_info), self.loop).result() + self.send_telecommand_async(tc_packet), self.loop).result() - async def send_telecommand_async(self, tc_packet, tc_packet_info): - self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) + async def send_telecommand_async(self, tc_packet): await self.usart.write(tc_packet) self.usart.inject_timeout_error() diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index 21c5dd8..8cabd24 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -43,6 +43,9 @@ class SerialComIF(CommunicationInterface): except serial.SerialException as e: logging.exception("Serial Port could not be closed! Traceback: " + str(e)) + def send_data(self, data: bytearray): + self.serial.write(data) + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: self.serial.write(tc_packet) diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index 5ba96f7..c539b14 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -20,7 +20,8 @@ class TcDictionaryKeys(Enum): DATA = 5 -PusTcInfoT = Union[Dict[TcDictionaryKeys, any], None] +PusTcInfo = Dict[TcDictionaryKeys, any] +PusTcInfoT = Union[PusTcInfo, None] PusTcTupleT = Tuple[bytearray, PusTcInfoT] TcAuxiliaryTupleT = Tuple[str, any] TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] diff --git a/tc/obsw_pus_tc_frame_packer.py b/tc/obsw_pus_tc_frame_packer.py new file mode 100644 index 0000000..6b00fb7 --- /dev/null +++ b/tc/obsw_pus_tc_frame_packer.py @@ -0,0 +1,49 @@ +""" +@brief Helper module to pack telecommand frames. +""" +from typing import List, Tuple +from tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo +from utility.obsw_logger import get_logger + +LOGGER = get_logger() + +def pack_tc_frame(tc_list: List[PusTelecommand], max_frame_size: int, + do_fill_tc_frame: bool = False) -> Tuple[bytearray, int, int]: + """ + Packs a given list of PusTelecommands into a TC frame. + :param tc_list: List of PUS telecommands + :param max_frame_size: Maximum allowed or desired frame size. Used to perform range checks + :param do_fill_tc_frame: If this is set to true, the frame is filled up to max_frame_size with + zeros + :return: A tuples consisting of the TC frame, the current size of the frame and the number + of packed commands. + """ + frame = bytearray() + packed_commands = 0 + for tc in tc_list: + next_packet_raw = tc.pack() + if len(frame) + len(next_packet_raw) > max_frame_size: + LOGGER.warning("Next telecommands would be too large for TC frame, skipping.") + break + frame += next_packet_raw + packed_commands += 1 + if do_fill_tc_frame: + frame = fill_tc_frame(frame, max_frame_size) + current_frame_size = len(frame) + return frame, current_frame_size, packed_commands + +def fill_tc_frame(tc_frame: bytearray, max_frame_size: int): + """ + Fill a given TC frame with 0 until it has the specified frame size. + """ + current_frame_len = len(tc_frame) + remaining_size_to_fill = max_frame_size - current_frame_len + if remaining_size_to_fill > 0: + tc_frame.extend(bytearray(remaining_size_to_fill)) + return tc_frame + +def pack_tc_info(tc_list: List[PusTelecommand]) -> List[PusTcInfo]: + tc_info_list = list() + for tc in tc_list: + tc_info_list.append(tc.pack_information()) + return tc_info_list \ No newline at end of file -- GitLab