Skip to content
Snippets Groups Projects
Commit df8d60c5 authored by Robin Mueller's avatar Robin Mueller
Browse files

generci framework for serial dle reception created

parent 43a0d951
No related branches found
No related tags found
No related merge requests found
......@@ -55,6 +55,12 @@ class SerialComIF(CommunicationInterface):
self.ser_com_type = ser_com_type
if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED:
self.serial_frame_size = com_type_args
elif self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
# todo: We need a separate thread which does nothing but poll the serial interface for
# data (with the option to disable listening temporarily) and writes the data
# into a ring buffer (Python: deque with maxlen). We also need a lock because 2
# threads use the deque
pass
def close(self):
try:
......@@ -74,24 +80,26 @@ class SerialComIF(CommunicationInterface):
return packet_list
return []
# TODO: Serialization is performed here, but I suspect this might slow down the
# listener thread.. but at least no printing is done here.
# It would be better if the listener just polled the data and the deserialization
# is done somewhere else.
def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
if self.data_available():
pus_data_list, number_of_packets = [], 0
if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED:
if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED:
if self.data_available():
self.data = self.serial.read(self.serial_frame_size)
pus_data_list = self.poll_pus_packets_fixed_frames(bytearray(self.data))
else:
LOGGER.warning("This communication type was not implemented yet!")
packet_list = []
for pus_packet in pus_data_list:
packet = PusTelemetryFactory.create(pus_packet)
packet_list.append(packet)
return True, packet_list
return False, []
packet_list = []
for pus_packet in pus_data_list:
packet = PusTelemetryFactory.create(pus_packet)
packet_list.append(packet)
return True, packet_list
return False, []
elif self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
# todo: the listener thread will fill a deque with a maximum length ( = ring buffer).
# If the number of bytes contained is larger than 0, we have to analyze the
# buffer for STX and ETX chars. We should propably copy all bytes in deque
# into a member list and then start scanning for STX and ETX. The deque should
# be protected with a lock when performing copy operations.
LOGGER.warning("This communication type was not implemented yet!")
else:
LOGGER.warning("This communication type was not implemented yet!")
def data_available(self, timeout: float = 0) -> bool:
if self.serial.in_waiting > 0:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment