From 688bf7b41bbf722ce97a19998e43500e69e12010 Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Wed, 29 Apr 2020 13:47:08 +0200 Subject: [PATCH] minor/form changes --- comIF/obsw_com_interface.py | 6 +-- comIF/obsw_qemu_com_if.py | 76 ++++++++++++++++++------------------ comIF/obsw_serial_com_if.py | 16 ++++---- utility/obsw_exit_handler.py | 15 ++++--- utility/obsw_tmtc_printer.py | 11 +++--- 5 files changed, 62 insertions(+), 62 deletions(-) diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index dc51109..76695e5 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -8,13 +8,11 @@ Description: Generic Communication Interface. Defines the syntax of the communic @author: R. Mueller """ from abc import abstractmethod -from typing import TypeVar, Tuple, Union -from tm.obsw_pus_tm_factory import PusTmQueueT, PusTmTupleQueueT, PusTmInfoQueueT, PusTmListT +from typing import Tuple +from tm.obsw_pus_tm_factory import PusTmListT from utility.obsw_tmtc_printer import TmTcPrinter from tc.obsw_pus_tc_base import PusTcInfoT -ComIfT = TypeVar('ComIfT', bound='CommunicationInterface') - # pylint: disable=useless-return # pylint: disable=no-self-use diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py index fea94bb..aae10f7 100755 --- a/comIF/obsw_qemu_com_if.py +++ b/comIF/obsw_qemu_com_if.py @@ -1,12 +1,8 @@ #!/usr/bin/env python - """ -Example framework for testing the USARTTestTask of the OBSW. +QEMU Communication Interface to communicate with emulated QEMU hardware via the UART interface. -This file is intended to showcase how a USART device would be emulated in -python and connected to the QEMU emulator. The datastructures and functions in -this file should indicate what a simulation framework for the QEMU emulator -might look like. +It utilizes the the asyncio library. Requirements: Python >= 3.7 (asyncio support) @@ -18,19 +14,20 @@ Instructions: -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin \ -qmp unix:/tmp/qemu,server -S - Then run this script. + Then run the telecommand script with -c 2 """ - import asyncio import struct import json import re import errno -from comIF.obsw_com_interface import CommunicationInterface import time -from tm.obsw_pus_tm_factory import PusTelemetryFactory from threading import Thread +from typing import Tuple +from comIF.obsw_com_interface import CommunicationInterface, PusTcInfoT, PusTmListT +from tm.obsw_pus_tm_factory import PusTelemetryFactory + # Paths to Unix Domain Sockets used by the emulator QEMU_ADDR_QMP = "/tmp/qemu" QEMU_ADDR_AT91_USART0 = "/tmp/qemu_at91_usart0" @@ -55,59 +52,62 @@ def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: class QEMUComIF(CommunicationInterface): + """ + Specific Communication Interface implementation of the QEMU USART protocol for the TMTC software + """ def __init__(self, tmtc_printer, tm_timeout, tc_timeout_factor): super().__init__(tmtc_printer) - self.tmTimeout = tm_timeout - self.tcTimeoutFactor = tc_timeout_factor + self.tm_timeout = tm_timeout + self.tc_timeout_factor = tc_timeout_factor self.loop = asyncio.get_event_loop() self.number_of_packets = 0 self.data = [] if not self.loop.is_running(): - self.t = Thread(target=start_background_loop, - args=(self.loop,), daemon=True) - self.t.start() + self.thread = Thread(target=start_background_loop, + args=(self.loop,), daemon=True) + self.thread.start() self.usart = asyncio.run_coroutine_threadsafe( - Usart.createAsync(QEMU_ADDR_AT91_USART0), self.loop).result() + Usart.create_async(QEMU_ADDR_AT91_USART0), self.loop).result() asyncio.run_coroutine_threadsafe(self.usart.open(), self.loop).result() - # Send Telecommand def close(self): self.usart.close() - def send_telecommand(self, tcPacket, tcPacketInfo=""): + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: asyncio.run_coroutine_threadsafe( - self.send_telecommand_async(tcPacket, tcPacketInfo), self.loop).result() + self.send_telecommand_async(tc_packet, tc_packet_info), self.loop).result() - async def send_telecommand_async(self, tc_packet, tcPacketInfo): - self.tmtc_printer.print_telecommand(tc_packet, tcPacketInfo) + async def send_telecommand_async(self, tc_packet, tc_packet_info): + self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) await self.usart.write(tc_packet) self.usart.inject_timeout_error() def receive_telemetry(self, parameters=0): - packet_list = self.poll_interface() + (packet_received, packet_list) = self.poll_interface() return packet_list - def poll_interface(self, parameter=0): + def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: packet_list = [] if self.data_available(): pus_data_list, number_of_packets = self.poll_pus_packets() for counter in range(0, number_of_packets): packet = PusTelemetryFactory.create(pus_data_list[counter]) packet_list.append(packet) - return packet_list + return True, packet_list + return False, packet_list - def data_available(self, timeout=0): - if self.usart.newDataAvailable(): + def data_available(self, timeout: any=0) -> bool: + if self.usart.new_data_available(): return True - elif timeout > 0: + if timeout > 0: start_time = time.time() elapsed_time = 0 while elapsed_time < timeout: - if self.usart.newDataAvailable(): + if self.usart.new_data_available(): return True elapsed_time = time.time() - start_time - return False + return False def poll_pus_packets(self): packets = self.poll_pus_packets_async() @@ -140,10 +140,10 @@ class QEMUComIF(CommunicationInterface): next_packet_size = next_payload_len + 7 if next_packet_size > 256: print("PUS Polling: Very large packet detected, " - "large packet reading not implemented yet !") + "large packet reading not implemented yet !") print("Detected Size: " + str(next_packet_size)) return end_index - elif next_payload_len == 0: + if next_payload_len == 0: end_index = 256 return end_index end_index = start_index + next_packet_size @@ -163,7 +163,6 @@ class QmpException(Exception): class QmpConnection: """A connection to a QEMU machine via QMP""" - def __init__(self, addr=QEMU_ADDR_QMP): self.transport = None self.addr = addr @@ -304,7 +303,8 @@ class DataFrame: return bytes([self.seq, self.cat, self.id, len(data)]) + bytes(data) def __repr__(self): - return f"{{ seq: 0x{self.seq:02x}, cat: 0x{self.cat:02x}, id: 0x{self.id:02x}, data: {self.data} }}" + return f"{{ seq: 0x{self.seq:02x}, cat: 0x{self.cat:02x}," \ + f" id: 0x{self.id:02x}, data: {self.data} }}" def parse_dataframes(buf): @@ -328,7 +328,7 @@ class UsartStatusException(Exception): class Usart: @staticmethod - async def createAsync(addr): + async def create_async(addr): return Usart(addr) """Connection to emulate a USART device for a given QEMU/At91 instance""" @@ -400,7 +400,7 @@ class Usart: if status != 0: raise UsartStatusException(status) - async def readAsync(self, n, timemout=1): + async def read_async(self, n, timemout=1): """Wait for 'n' bytes to be received from the USART timeout in seconds""" end_time = time.time()+timemout @@ -425,15 +425,15 @@ class Usart: data, self.datab = self.datab[:n], self.datab[n:] return data - def newDataAvailable(self) -> bool: + def new_data_available(self) -> bool: return not self.dataq.empty() def flush(self): while True: try: self.dataq.get_nowait() - except Exception as e: - print(e) + except Exception as error: + print(error) return def inject_overrun_error(self): diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index c7f4571..21c5dd8 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -1,11 +1,9 @@ """ @file obsw_serial_com_if.py -@date 01.11.2019 @brief Serial Communication Interface - @author R. Mueller +@date 01.11.2019 """ -import sys import time import logging from typing import Tuple, List, Optional @@ -18,7 +16,7 @@ from tc.obsw_pus_tc_base import PusTcInfoT from utility.obsw_logger import get_logger -logger = get_logger() +LOGGER = get_logger() SERIAL_FRAME_LENGTH = 256 HEADER_BYTES_BEFORE_SIZE = 5 @@ -34,8 +32,8 @@ class SerialComIF(CommunicationInterface): super().__init__(tmtc_printer) try: self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout) - except serial.SerialException as e: - logging.exception("Serial Port could not be closed! Traceback: " + str(e)) + except serial.SerialException as error: + LOGGER.exception("Serial Port could not be closed! Traceback: %s", str(error)) self.data = bytearray() self.number_of_packets = 0 @@ -104,11 +102,11 @@ class SerialComIF(CommunicationInterface): next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index]) next_packet_size = next_payload_len + 7 if next_packet_size > SERIAL_FRAME_LENGTH: - logger.error("PUS Polling: Very large packet detected, " + LOGGER.error("PUS Polling: Very large packet detected, " "large packet reading not implemented yet !") - logger.error("Detected Size: " + str(next_packet_size)) + LOGGER.error("Detected Size: " + str(next_packet_size)) return end_index - elif next_payload_len == 0: + if next_payload_len == 0: end_index = SERIAL_FRAME_LENGTH return end_index end_index = start_index + next_packet_size diff --git a/utility/obsw_exit_handler.py b/utility/obsw_exit_handler.py index ff9990b..c274cec 100644 --- a/utility/obsw_exit_handler.py +++ b/utility/obsw_exit_handler.py @@ -4,6 +4,14 @@ from utility.obsw_logger import get_logger logger = get_logger() +def keyboard_interrupt_handler(com_interface: CommunicationInterface): + logger.info("Disconnect registered") + # Unit Test closes Serial Port at the end + # We could do some optional stuff here + if com_interface is not None: + com_interface.send_telecommand(bytearray([0, 0, 0, 0, 0])) + + class GracefulKiller: kill_now = False @@ -16,9 +24,4 @@ class GracefulKiller: print("I was killed") -def keyboard_interrupt_handler(com_interface: CommunicationInterface): - logger.info("Disconnect registered") - # Unit Test closes Serial Port at the end - # We could do some optional stuff here - if com_interface is not None: - com_interface.send_telecommand(bytearray([0, 0, 0, 0, 0])) + diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 470b643..c7e7ccf 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3.7 +#!/usr/bin/python3.8 # -*- coding: utf-8 -*- """ @file @@ -37,19 +37,20 @@ class TmTcPrinter: :param do_print_to_file: if true, print to file :param print_tc: if true, print TCs """ - # TODO: we should implement a list of strings here. each service test string - # is written into a list entry. For the file output, the list entries are concatenated - # and put into the main log file. self.display_mode = display_mode self.do_print_to_file = do_print_to_file self.print_tc = print_tc self.__print_buffer = "" # global print buffer which will be useful to print something to file self.__file_buffer = "" - # List implementation to store + # TODO: Full print not working yet + # List implementation to store multiple strings self.file_buffer_list = [] def print_telemetry_queue(self, tm_queue: PusTmQueueT): + """ + Print the telemetry queue which should contain lists of TM class instances. + """ for tm_list in tm_queue: for tm_packet in tm_list: self.print_telemetry(tm_packet) -- GitLab