Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_serial_com_if.py 5.82 KiB
"""
@file   obsw_serial_com_if.py
@date   01.11.2019
@brief  Serial Communication Interface

@author R. Mueller
"""
import sys
import time
import logging
from typing import Tuple, List, Union, Optional

import serial
from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT
from utility.obsw_tmtc_printer import TmTcPrinterT
from tm.obsw_pus_tm_factory import PusTelemetryFactory
from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT
from tc.obsw_pus_tc_base import PusTcInfoT

SERIAL_PACKET_MAX_SIZE = 1024
HEADER_BYTES_BEFORE_SIZE = 5


# pylint: disable=arguments-differ
class SerialComIF(CommunicationInterface):
    """
    Communication Interface to use serial communication. This requires the PySerial library
    on Windows. It has been tested only on Windows.
    """
    def __init__(self, tmtc_printer: TmTcPrinterT, com_port: str, baud_rate: int,
                 serial_timeout: float):
        super().__init__(tmtc_printer)
        try:
            self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout)
        except serial.SerialException:
            print("Serial Port can not be opened")
            logging.exception("Error")
            sys.exit()
        self.data = bytearray()
        self.number_of_packets = 0

    def close(self):
        try:
            self.serial.close()
        except serial.SerialException:
            print("Serial Port could not be closed !")
            logging.exception("Error")
            sys.exit()

    def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
        self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
        self.serial.write(tc_packet)

    def receive_telemetry(self, parameters: any = 0) -> list:
        (packet_received, packet_list) = self.poll_interface()
        if packet_received:
            return packet_list
        return []

    def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
        if self.data_available():
            pus_data_list, number_of_packets = self.__poll_pus_packets()
            packet_list = []
            for counter in range(0, number_of_packets):
                packet = PusTelemetryFactory.create(pus_data_list[counter])
                self.tmtc_printer.print_telemetry(packet)
                packet_list.append(packet)
            return True, packet_list
        return False, []

    def data_available(self, timeout: float = 0) -> bool:
        if self.serial.in_waiting > 0:
            return True
        if timeout > 0:
            start_time = time.time()
            elapsed_time = 0
            while elapsed_time < timeout:
                if self.serial.in_waiting > 0:
                    return True
                elapsed_time = time.time() - start_time
        return False

    def __poll_pus_packets(self) -> Tuple[List[Optional[bytes]], int]:
        pus_data_list = []
        self.data = self.serial.read(1024)
        packet_size = (self.data[4] << 8 | self.data[5]) + 7
        read_size = len(self.data)
        self.number_of_packets = 1
        if read_size < packet_size:
            print("Serial Com IF: Size missmatch when polling PUS packet. Packet Size: " +
                  str(packet_size) + ". Read Size: " + str(read_size) + ". Check timeout too")
        if read_size > packet_size:
            self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
        else:
            pus_data_list.append(self.data)
        return pus_data_list, self.number_of_packets

    def __handle_multiple_packets(self, packet_size: int, read_size: int, pus_data_list: list):
        end_of_buffer = read_size - 1
        end_index = packet_size
        start_index = 0
        pus_data = self.data[start_index:end_index]
        pus_data_list.append(pus_data)
        while end_index < end_of_buffer:
            end_index = self.__parse_next_packets(end_index, pus_data_list)

    def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
        start_index = end_index
        end_index = end_index + 5
        next_packet_size = (self.data[end_index - 1] << 8 | self.data[end_index]) + 7
        if next_packet_size > SERIAL_PACKET_MAX_SIZE:
            print("PUS Polling: Very Large packet detected, "
                  "large packet reading not implemented yet !")
            print("Detected Size: " + str(next_packet_size))
            return SERIAL_PACKET_MAX_SIZE
        end_index = start_index + next_packet_size
        pus_data = self.data[start_index:end_index]
        pus_data_list.append(pus_data)
        self.number_of_packets = self.number_of_packets + 1
        return end_index

    def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
        (packet_received, pus_packets) = self.poll_interface()
        if packet_received:
            for packet in pus_packets:
                tm_queue.append(packet)
        return tm_queue

    def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
            Union[None, PusTmInfoQueueT]:
        (packet_received, pus_packets) = self.poll_interface()
        if packet_received:
            for packet in pus_packets:
                tm_info = packet.pack_tm_information()
                tm_info_queue.append(tm_info)
        return tm_info_queue

    def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \
            Union[None, PusTmTupleQueueT]:
        (packet_received, pus_packets) = self.poll_interface()
        if packet_received:
            for packet in pus_packets:
                tm_info = packet.pack_tm_information()
                tm_tuple_queue.append((tm_info, packet))
        return tm_tuple_queue