Skip to content
Snippets Groups Projects
Commit 8d74d79a authored by Robin Mueller's avatar Robin Mueller
Browse files

optimized serial interfaces

parent 0509ca7b
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ import sys
from comIF.obsw_com_interface import CommunicationInterface
from comIF.obsw_dummy_com_if import DummyComIF
from comIF.obsw_ethernet_com_if import EthernetComIF
from comIF.obsw_serial_com_if import SerialComIF
from comIF.obsw_serial_com_if import SerialComIF, SerialCommunicationType
from comIF.obsw_qemu_com_if import QEMUComIF
from utility.obsw_logger import get_logger
......@@ -32,9 +32,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter
elif g.G_COM_IF == g.ComIF.Serial:
serial_baudrate = g.G_SERIAL_BAUDRATE
serial_timeout = g.G_SERIAL_TIMEOUT
serial_frame_size = g.G_SERIAL_FRAME_SIZE
communication_interface = SerialComIF(
tmtc_printer=tmtc_printer, com_port=g.G_COM_PORT, baud_rate=serial_baudrate,
serial_timeout=serial_timeout)
serial_timeout=serial_timeout,
ser_com_type=SerialCommunicationType.FIXED_FRAME_BASED,
com_type_args=serial_frame_size)
elif g.G_COM_IF == g.ComIF.QEMU:
communication_interface = QEMUComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT,
......
......@@ -45,7 +45,6 @@ class EthernetComIF(CommunicationInterface):
self.udp_socket.sendto(tc_packet, self.destination_address)
def data_available(self, timeout: float = 0) -> bool:
# print("polling" + str(self.udp_socket))
ready = select.select([self.udp_socket], [], [], timeout)
if ready[0]:
return True
......@@ -83,8 +82,8 @@ class EthernetComIF(CommunicationInterface):
def connect_to_board(self):
"""
For UDP, this is used to initiate communication.
For UDP, this can be used to initiate communication.
:return:
"""
test = bytearray([])
self.send_telecommand(test)
\ No newline at end of file
ping = bytearray([])
self.send_telecommand(ping)
......@@ -29,8 +29,10 @@ from typing import Tuple
from logging import getLogger
from comIF.obsw_com_interface import CommunicationInterface, PusTcInfoT, PusTmListT
from tm.obsw_pus_tm_factory import PusTelemetryFactory
from comIF.obsw_serial_com_if import SerialComIF
LOGGER = getLogger()
SERIAL_FRAME_LENGTH = 256
# Paths to Unix Domain Sockets used by the emulator
QEMU_ADDR_QMP = "/tmp/qemu"
......@@ -100,14 +102,14 @@ class QEMUComIF(CommunicationInterface):
def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
packet_list = []
if self.data_available():
pus_data_list, number_of_packets = self.poll_pus_packets()
for counter in range(0, number_of_packets):
packet = PusTelemetryFactory.create(pus_data_list[counter])
pus_data_list = self.poll_pus_packets()
for pus_packet in pus_data_list:
packet = PusTelemetryFactory.create(pus_packet)
packet_list.append(packet)
return True, packet_list
return False, packet_list
def data_available(self, timeout: any=0) -> bool:
def data_available(self, timeout: any = 0) -> bool:
if self.usart.new_data_available():
return True
if timeout > 0:
......@@ -124,43 +126,48 @@ class QEMUComIF(CommunicationInterface):
return packets
def poll_pus_packets_async(self):
pus_data_list = []
self.data = self.usart.read(256)
payload_length = (self.data[4] << 8 | self.data[5])
packet_size = payload_length + 7
if payload_length == 0:
return [], 0
read_size = len(self.data)
self.number_of_packets = 1
pus_data = self.data[0:packet_size]
pus_data_list.append(pus_data)
self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
return pus_data_list, self.number_of_packets
def __handle_multiple_packets(self, packet_size, read_size, pus_data_list):
end_of_buffer = read_size - 1
end_index = packet_size
while end_index < end_of_buffer:
end_index = self.__parse_next_packets(end_index, pus_data_list)
def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
start_index = end_index
end_index = end_index + 5
next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index])
next_packet_size = next_payload_len + 7
if next_packet_size > 256:
print("PUS Polling: Very large packet detected, "
"large packet reading not implemented yet !")
print("Detected Size: " + str(next_packet_size))
return end_index
if next_payload_len == 0:
end_index = 256
return end_index
end_index = start_index + next_packet_size
pus_data = self.data[start_index:end_index]
pus_data_list.append(pus_data)
self.number_of_packets = self.number_of_packets + 1
return end_index
self.data = self.usart.read(SERIAL_FRAME_LENGTH)
return SerialComIF.poll_pus_packets_fixed_frames(self.data)
# todo: use function from serial com IF to deducde redundant code.
# def poll_pus_packets_async(self):
# pus_data_list = []
# self.data = self.usart.read(256)
# payload_length = (self.data[4] << 8 | self.data[5])
# packet_size = payload_length + 7
# if payload_length == 0:
# return [], 0
# read_size = len(self.data)
# self.number_of_packets = 1
# pus_data = self.data[0:packet_size]
# pus_data_list.append(pus_data)
# self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
# return pus_data_list, self.number_of_packets
#
# def __handle_multiple_packets(self, packet_size, read_size, pus_data_list):
# end_of_buffer = read_size - 1
# end_index = packet_size
# while end_index < end_of_buffer:
# end_index = self.__parse_next_packets(end_index, pus_data_list)
#
# def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
# start_index = end_index
# end_index = end_index + 5
# next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index])
# next_packet_size = next_payload_len + 7
# if next_packet_size > 256:
# print("PUS Polling: Very large packet detected, "
# "large packet reading not implemented yet !")
# print("Detected Size: " + str(next_packet_size))
# return end_index
# if next_payload_len == 0:
# end_index = 256
# return end_index
# end_index = start_index + next_packet_size
# pus_data = self.data[start_index:end_index]
# pus_data_list.append(pus_data)
# self.number_of_packets = self.number_of_packets + 1
# return end_index
class QmpException(Exception):
......
......@@ -6,7 +6,8 @@
"""
import time
import logging
from typing import Tuple, List, Optional
from typing import Tuple
from enum import Enum
import serial
from comIF.obsw_com_interface import CommunicationInterface
......@@ -21,21 +22,39 @@ SERIAL_FRAME_LENGTH = 256
HEADER_BYTES_BEFORE_SIZE = 5
class SerialCommunicationType(Enum):
TIMEOUT_BASED = 0
FIXED_FRAME_BASED = 1
DLE_ENCODING = 2
# pylint: disable=arguments-differ
class SerialComIF(CommunicationInterface):
"""
Communication Interface to use serial communication. This requires the PySerial library
on Windows. It has been tested only on Windows.
Communication Interface to use serial communication. This requires the PySerial library.
"""
def __init__(self, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int,
serial_timeout: float):
serial_timeout: float,
ser_com_type: SerialCommunicationType = SerialCommunicationType.FIXED_FRAME_BASED,
com_type_args: any = 0):
"""
Initiaze a serial communication handler.
:param tmtc_printer: TMTC printer object. Can be used for diagnostic purposes, but main
packet handling should be done by a separate thread.
:param com_port: Specify COM port.
:param baud_rate: Specify baud rate
:param serial_timeout: Specify serial timeout
:param ser_com_type: Specify how to handle serial reception
"""
super().__init__(tmtc_printer)
try:
self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout)
except serial.SerialException as error:
LOGGER.exception("Serial Port could not be closed! Traceback: %s", str(error))
self.data = bytearray()
self.number_of_packets = 0
self.ser_com_type = ser_com_type
if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED:
self.serial_frame_size = com_type_args
def close(self):
try:
......@@ -57,12 +76,19 @@ class SerialComIF(CommunicationInterface):
# TODO: Serialization is performed here, but I suspect this might slow down the
# listener thread.. but at least no printing is done here.
# It would be better if the listener just polled the data and the deserialization
# is done somewhere else.
def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
if self.data_available():
pus_data_list, number_of_packets = self.__poll_pus_packets()
pus_data_list, number_of_packets = [], 0
if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED:
self.data = self.serial.read(self.serial_frame_size)
pus_data_list = self.poll_pus_packets_fixed_frames(bytearray(self.data))
else:
LOGGER.warning("This communication type was not implemented yet!")
packet_list = []
for counter in range(0, number_of_packets):
packet = PusTelemetryFactory.create(bytearray(pus_data_list[counter]))
for pus_packet in pus_data_list:
packet = PusTelemetryFactory.create(pus_packet)
packet_list.append(packet)
return True, packet_list
return False, []
......@@ -79,58 +105,50 @@ class SerialComIF(CommunicationInterface):
elapsed_time = time.time() - start_time
return False
def __poll_pus_packets(self) -> Tuple[List[Optional[bytes]], int]:
@staticmethod
def poll_pus_packets_fixed_frames(data: bytearray) -> list:
pus_data_list = []
self.data = self.serial.read(SERIAL_FRAME_LENGTH)
payload_length = (self.data[4] << 8 | self.data[5])
if len(data) == 0:
return pus_data_list
payload_length = data[4] << 8 | data[5]
packet_size = payload_length + 7
if payload_length == 0:
return [], 0
read_size = len(self.data)
self.number_of_packets = 1
pus_data = self.data[0:packet_size]
return []
read_size = len(data)
pus_data = data[0:packet_size]
pus_data_list.append(pus_data)
self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
return pus_data_list, self.number_of_packets
def __handle_multiple_packets(self, packet_size: int, read_size: int, pus_data_list: list):
end_of_buffer = read_size - 1
end_index = packet_size
while end_index < end_of_buffer:
end_index = self.__parse_next_packets(end_index, pus_data_list)
def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
start_index = end_index
end_index = end_index + 5
next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index])
SerialComIF.read_multiple_packets(data, packet_size, read_size, pus_data_list)
return pus_data_list
@staticmethod
def read_multiple_packets(data: bytearray, start_index: int, frame_size: int,
pus_data_list: list):
while start_index < frame_size:
start_index = SerialComIF.parse_next_packets(data, start_index, frame_size,
pus_data_list)
@staticmethod
def parse_next_packets(data: bytearray, start_index: int, frame_size: int,
pus_data_list: list) -> int:
next_payload_len = data[start_index + 4] << 8 | data[start_index + 5]
if next_payload_len == 0:
end_index = frame_size
return end_index
next_packet_size = next_payload_len + 7
# remaining_size = frame_size - start_index
if next_packet_size > SERIAL_FRAME_LENGTH:
LOGGER.error("PUS Polling: Very large packet detected, "
"large packet reading not implemented yet !")
"packet splitting not implemented yet!")
LOGGER.error("Detected Size: " + str(next_packet_size))
end_index = frame_size
return end_index
if next_payload_len == 0:
end_index = SERIAL_FRAME_LENGTH
return end_index
end_index = start_index + next_packet_size
pus_data = self.data[start_index:end_index]
pus_data = data[start_index:end_index]
pus_data_list.append(pus_data)
self.number_of_packets = self.number_of_packets + 1
return end_index
# def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
# (packet_received, pus_packets) = self.poll_interface()
# if packet_received:
# for packet in pus_packets:
# tm_queue.append(packet)
# return tm_queue
#
# def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
# Union[None, PusTmInfoQueueT]:
# (packet_received, pus_packets) = self.poll_interface()
# if packet_received:
# for packet in pus_packets:
# tm_info = packet.pack_tm_information()
# tm_info_queue.append(tm_info)
# return tm_info_queue
......@@ -85,8 +85,9 @@ G_DISPLAY_MODE = "long"
G_COM_IF = 2
# COM Port for serial communication
G_COM_PORT = 'COM0'
G_SERIAL_TIMEOUT = 1
G_SERIAL_TIMEOUT = 0.5
G_SERIAL_BAUDRATE = 230400
G_SERIAL_FRAME_SIZE = 256
# Time related
G_TM_TIMEOUT = 6
G_TC_SEND_TIMEOUT_FACTOR = 2.0
......
......@@ -8,6 +8,7 @@ from enum import Enum
# Yeah, I did not have a better idea yet..
# Next step would be a configuration file
class Developer(Enum):
Robin = 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment