Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_serial_com_if.py 5.82 KiB
"""
@file obsw_serial_com_if.py
@date 01.11.2019
@brief Serial Communication Interface
@author R. Mueller
"""
import sys
import time
import logging
from typing import Tuple, List, Union, Optional
import serial
from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT
from utility.obsw_tmtc_printer import TmTcPrinterT
from tm.obsw_pus_tm_factory import PusTelemetryFactory
from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT
from tc.obsw_pus_tc_base import PusTcInfoT
SERIAL_PACKET_MAX_SIZE = 1024
HEADER_BYTES_BEFORE_SIZE = 5
# pylint: disable=arguments-differ
class SerialComIF(CommunicationInterface):
"""
Communication Interface to use serial communication. This requires the PySerial library
on Windows. It has been tested only on Windows.
"""
def __init__(self, tmtc_printer: TmTcPrinterT, com_port: str, baud_rate: int,
serial_timeout: float):
super().__init__(tmtc_printer)
try:
self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout)
except serial.SerialException:
print("Serial Port can not be opened")
logging.exception("Error")
sys.exit()
self.data = bytearray()
self.number_of_packets = 0
def close(self):
try:
self.serial.close()
except serial.SerialException:
print("Serial Port could not be closed !")
logging.exception("Error")
sys.exit()
def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
self.serial.write(tc_packet)
def receive_telemetry(self, parameters: any = 0) -> list:
(packet_received, packet_list) = self.poll_interface()
if packet_received:
return packet_list
return []
def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
if self.data_available():
pus_data_list, number_of_packets = self.__poll_pus_packets()
packet_list = []
for counter in range(0, number_of_packets):
packet = PusTelemetryFactory.create(pus_data_list[counter])
self.tmtc_printer.print_telemetry(packet)
packet_list.append(packet)
return True, packet_list
return False, []
def data_available(self, timeout: float = 0) -> bool:
if self.serial.in_waiting > 0:
return True
if timeout > 0:
start_time = time.time()
elapsed_time = 0
while elapsed_time < timeout:
if self.serial.in_waiting > 0:
return True
elapsed_time = time.time() - start_time
return False
def __poll_pus_packets(self) -> Tuple[List[Optional[bytes]], int]:
pus_data_list = []
self.data = self.serial.read(1024)
packet_size = (self.data[4] << 8 | self.data[5]) + 7
read_size = len(self.data)
self.number_of_packets = 1
if read_size < packet_size:
print("Serial Com IF: Size missmatch when polling PUS packet. Packet Size: " +
str(packet_size) + ". Read Size: " + str(read_size) + ". Check timeout too")
if read_size > packet_size:
self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
else:
pus_data_list.append(self.data)
return pus_data_list, self.number_of_packets
def __handle_multiple_packets(self, packet_size: int, read_size: int, pus_data_list: list):
end_of_buffer = read_size - 1
end_index = packet_size
start_index = 0
pus_data = self.data[start_index:end_index]
pus_data_list.append(pus_data)
while end_index < end_of_buffer:
end_index = self.__parse_next_packets(end_index, pus_data_list)
def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
start_index = end_index
end_index = end_index + 5
next_packet_size = (self.data[end_index - 1] << 8 | self.data[end_index]) + 7
if next_packet_size > SERIAL_PACKET_MAX_SIZE:
print("PUS Polling: Very Large packet detected, "
"large packet reading not implemented yet !")
print("Detected Size: " + str(next_packet_size))
return SERIAL_PACKET_MAX_SIZE
end_index = start_index + next_packet_size
pus_data = self.data[start_index:end_index]
pus_data_list.append(pus_data)
self.number_of_packets = self.number_of_packets + 1
return end_index
def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
(packet_received, pus_packets) = self.poll_interface()
if packet_received:
for packet in pus_packets:
tm_queue.append(packet)
return tm_queue
def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
Union[None, PusTmInfoQueueT]:
(packet_received, pus_packets) = self.poll_interface()
if packet_received:
for packet in pus_packets:
tm_info = packet.pack_tm_information()
tm_info_queue.append(tm_info)
return tm_info_queue
def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \
Union[None, PusTmTupleQueueT]:
(packet_received, pus_packets) = self.poll_interface()
if packet_received:
for packet in pus_packets:
tm_info = packet.pack_tm_information()
tm_tuple_queue.append((tm_info, packet))
return tm_tuple_queue