diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index 76695e52fe1a54f190cc8c42b6477d7002b24aca..75e4bc38f14a37f59a64d893c0b5410f56c7abd7 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -32,6 +32,12 @@ class CommunicationInterface: :return: """ + @abstractmethod + def send_data(self, data: bytearray): + """ + Send data, for example a frame containing packets. + """ + @abstractmethod def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: """ diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py index aae10f710542b00e008f0d5b4034e67c1b5836ca..035ec6a23208b1d20769740dbc1eed400f64d04d 100755 --- a/comIF/obsw_qemu_com_if.py +++ b/comIF/obsw_qemu_com_if.py @@ -74,12 +74,15 @@ class QEMUComIF(CommunicationInterface): def close(self): self.usart.close() + def send_data(self, data: bytearray): + asyncio.run_coroutine_threadsafe( + self.send_telecommand_async(data), self.loop).result() + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: asyncio.run_coroutine_threadsafe( - self.send_telecommand_async(tc_packet, tc_packet_info), self.loop).result() + self.send_telecommand_async(tc_packet), self.loop).result() - async def send_telecommand_async(self, tc_packet, tc_packet_info): - self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) + async def send_telecommand_async(self, tc_packet): await self.usart.write(tc_packet) self.usart.inject_timeout_error() diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index 21c5dd8b2c1e24d481fe81e4cc2ec6a4cd1d93ce..8cabd24e696fcc93eb22fed4e791844e579f1b2b 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -43,6 +43,9 @@ class SerialComIF(CommunicationInterface): except serial.SerialException as e: logging.exception("Serial Port could not be closed! Traceback: " + str(e)) + def send_data(self, data: bytearray): + self.serial.write(data) + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: self.serial.write(tc_packet) diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index 5ba96f7f77f256e6258ba74fcb04ad1263416f7c..c539b14fc86710179be01412f8dd9fbd8ccaa74a 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -20,7 +20,8 @@ class TcDictionaryKeys(Enum): DATA = 5 -PusTcInfoT = Union[Dict[TcDictionaryKeys, any], None] +PusTcInfo = Dict[TcDictionaryKeys, any] +PusTcInfoT = Union[PusTcInfo, None] PusTcTupleT = Tuple[bytearray, PusTcInfoT] TcAuxiliaryTupleT = Tuple[str, any] TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] diff --git a/tc/obsw_pus_tc_frame_packer.py b/tc/obsw_pus_tc_frame_packer.py new file mode 100644 index 0000000000000000000000000000000000000000..6b00fb7d883b3bc5051a5b34481977e3beedf932 --- /dev/null +++ b/tc/obsw_pus_tc_frame_packer.py @@ -0,0 +1,49 @@ +""" +@brief Helper module to pack telecommand frames. +""" +from typing import List, Tuple +from tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo +from utility.obsw_logger import get_logger + +LOGGER = get_logger() + +def pack_tc_frame(tc_list: List[PusTelecommand], max_frame_size: int, + do_fill_tc_frame: bool = False) -> Tuple[bytearray, int, int]: + """ + Packs a given list of PusTelecommands into a TC frame. + :param tc_list: List of PUS telecommands + :param max_frame_size: Maximum allowed or desired frame size. Used to perform range checks + :param do_fill_tc_frame: If this is set to true, the frame is filled up to max_frame_size with + zeros + :return: A tuples consisting of the TC frame, the current size of the frame and the number + of packed commands. + """ + frame = bytearray() + packed_commands = 0 + for tc in tc_list: + next_packet_raw = tc.pack() + if len(frame) + len(next_packet_raw) > max_frame_size: + LOGGER.warning("Next telecommands would be too large for TC frame, skipping.") + break + frame += next_packet_raw + packed_commands += 1 + if do_fill_tc_frame: + frame = fill_tc_frame(frame, max_frame_size) + current_frame_size = len(frame) + return frame, current_frame_size, packed_commands + +def fill_tc_frame(tc_frame: bytearray, max_frame_size: int): + """ + Fill a given TC frame with 0 until it has the specified frame size. + """ + current_frame_len = len(tc_frame) + remaining_size_to_fill = max_frame_size - current_frame_len + if remaining_size_to_fill > 0: + tc_frame.extend(bytearray(remaining_size_to_fill)) + return tc_frame + +def pack_tc_info(tc_list: List[PusTelecommand]) -> List[PusTcInfo]: + tc_info_list = list() + for tc in tc_list: + tc_info_list.append(tc.pack_information()) + return tc_info_list \ No newline at end of file