diff --git a/comIF/obsw_com_config.py b/comIF/obsw_com_config.py index 8e6e08e1c4f2236f9d4cc4433a0b8a2201e0ba43..a267d48e3b962b45064679b2862a201cfe8b039e 100644 --- a/comIF/obsw_com_config.py +++ b/comIF/obsw_com_config.py @@ -7,6 +7,7 @@ from comIF.obsw_com_interface import CommunicationInterface from comIF.obsw_dummy_com_if import DummyComIF from comIF.obsw_ethernet_com_if import EthernetComIF from comIF.obsw_serial_com_if import SerialComIF +from comIF.obsw_qemu_com_if import QEMUComIF from utility.obsw_logger import get_logger from utility.obsw_tmtc_printer import TmTcPrinter @@ -21,6 +22,8 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter :param tmtc_printer: TmTcPrinter object. :return: CommunicationInterface object """ + print("test") + print(g.G_COM_IF) try: if g.G_COM_IF == g.ComIF.Ethernet: communication_interface = EthernetComIF( @@ -34,6 +37,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter communication_interface = SerialComIF( tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, serial_timeout=g.G_SERIAL_TIMEOUT) + elif g.G_COM_IF == g.ComIF.QEMU: + print("hallo") + communication_interface = QEMUComIF( + tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, + tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR) + else: communication_interface = DummyComIF(tmtc_printer=tmtc_printer) return communication_interface diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py new file mode 100755 index 0000000000000000000000000000000000000000..50c4287d653e9b5fc616a78a9dff66c0c3b6d946 --- /dev/null +++ b/comIF/obsw_qemu_com_if.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python + +""" +Example framework for testing the USARTTestTask of the OBSW. + +This file is intended to showcase how a USART device would be emulated in +python and connected to the QEMU emulator. The datastructures and functions in +this file should indicate what a simulation framework for the QEMU emulator +might look like. + +Requirements: + Python >= 3.7 (asyncio support) + +Instructions: + Run QEMU (modified for OBSW) via + + qemu-system-arm -M isis-obc -monitor stdio \ + -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin \ + -qmp unix:/tmp/qemu,server -S + + Then run this script. +""" + + +import asyncio +import struct +import json +import re +import errno +from comIF.obsw_com_interface import CommunicationInterface +import time +from tm.obsw_pus_tm_factory import PusTelemetryFactory +from threading import Thread +# Paths to Unix Domain Sockets used by the emulator +QEMU_ADDR_QMP = "/tmp/qemu" +QEMU_ADDR_AT91_USART0 = "/tmp/qemu_at91_usart0" +QEMU_ADDR_AT91_USART2 = "/tmp/qemu_at91_usart2" + +# Request/response category and command IDs +IOX_CAT_DATA = 0x01 +IOX_CAT_FAULT = 0x02 + +IOX_CID_DATA_IN = 0x01 +IOX_CID_DATA_OUT = 0x02 + +IOX_CID_FAULT_OVRE = 0x01 +IOX_CID_FAULT_FRAME = 0x02 +IOX_CID_FAULT_PARE = 0x03 +IOX_CID_FAULT_TIMEOUT = 0x04 + + +def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + +class QEMUComIF(CommunicationInterface): + def __init__(self, tmtcPrinter, tmTimeout, tcTimeoutFactor): + super().__init__(tmtcPrinter) + self.tmTimeout = tmTimeout + self.tcTimeoutFactor = tcTimeoutFactor + self.loop = asyncio.get_event_loop() + self.number_of_packets = 0 + if(not self.loop.is_running()): + self.t = Thread(target=start_background_loop, + args=(self.loop,), daemon=True) + self.t.start() + + self.usart = asyncio.run_coroutine_threadsafe( + Usart.createAsync(QEMU_ADDR_AT91_USART0), self.loop).result() + asyncio.run_coroutine_threadsafe(self.usart.open(), self.loop).result() + + # Send Telecommand + def close(self): + self.usart.close() + + def send_telecommand(self, tcPacket, tcPacketInfo=""): + asyncio.run_coroutine_threadsafe( + self.sendTelecommandAsync(tcPacket, tcPacketInfo), self.loop).result() + + async def sendTelecommandAsync(self, tcPacket, tcPacketInfo): + self.tmtc_printer.print_telecommand(tcPacket, tcPacketInfo) + await self.usart.write(tcPacket) + self.usart.inject_timeout_error() + + def receive_telemetry(self, parameters=0): + packetList = self.poll_interface() + return packetList + + def poll_interface(self, parameter=0): + if self.data_available(): + pusDataList, numberOfPackets = self.pollPusPackets() + packetList = [] + for counter in range(0, numberOfPackets): + packet = PusTelemetryFactory.create(pusDataList[counter]) + # self.tmtc_printer.print_telemetry(packet) + packetList.append(packet) + return packetList + + def data_available(self, timeout=0): + if self.usart.newDataAvailable(): + return True + elif timeout > 0: + start_time = time.time() + elapsed_time = 0 + while elapsed_time < timeout: + if self.usart.newDataAvailable(): + return True + elapsed_time = time.time() - start_time + return False + + def pollPusPackets(self): + packets = self.pollPusPacketsAsync() + return packets + + def pollPusPacketsAsync(self): + pus_data_list = [] + self.data = self.usart.read(256) + payload_length = (self.data[4] << 8 | self.data[5]) + packet_size = payload_length + 7 + if payload_length == 0: + return [], 0 + read_size = len(self.data) + self.number_of_packets = 1 + pus_data = self.data[0:packet_size] + pus_data_list.append(pus_data) + self.__handle_multiple_packets(packet_size, read_size, pus_data_list) + return pus_data_list, self.number_of_packets + + def __handle_multiple_packets(self, packet_size, read_size, pus_data_list): + end_of_buffer = read_size - 1 + end_index = packet_size + while end_index < end_of_buffer: + end_index = self.__parse_next_packets(end_index, pus_data_list) + + def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int: + start_index = end_index + end_index = end_index + 5 + next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index]) + next_packet_size = next_payload_len + 7 + if next_packet_size > 256: + print("PUS Polling: Very large packet detected, " + "large packet reading not implemented yet !") + print("Detected Size: " + str(next_packet_size)) + return end_index + elif next_payload_len == 0: + end_index = 256 + return end_index + end_index = start_index + next_packet_size + pus_data = self.data[start_index:end_index] + pus_data_list.append(pus_data) + self.number_of_packets = self.number_of_packets + 1 + return end_index + + def performListenerMode(self): + print("Listening for packages ...") + while True: + self.pollInterface() + + def receiveTelemetryAndStoreIntoQueue(self, tmQueue): + packet = self.pollInterface() + tmQueue.put(packet) + + def receiveTelemetryAndStoreTuple(self, tmTupleQueue): + packet = self.pollInterface() + tmInfo = packet.packTmInformation() + tmTupleQueue.put(packet, tmInfo) + + +class QmpException(Exception): + """An exception caused by the QML/QEMU as response to a failed command""" + + def __init__(self, ret, *args, **kwargs): + Exception.__init__(self, f"QMP error: {ret}") + self.ret = ret # the 'return' structure provided by QEMU/QML + + +class QmpConnection: + """A connection to a QEMU machine via QMP""" + + def __init__(self, addr=QEMU_ADDR_QMP): + self.transport = None + self.addr = addr + self.dataq = asyncio.Queue() + self.initq = asyncio.Queue() + self.proto = None + + def _protocol(self): + """The underlying transport protocol""" + + if self.proto is None: + self.proto = QmpProtocol(self) + + return self.proto + + async def _wait_check_return(self): + """ + Wait for the status return of a command and raise an exception if it + indicates a failure + """ + + resp = await self.dataq.get() + if resp["return"]: + raise QmpException(resp["return"]) + + async def open(self): + """ + Open this connection. Connect to the machine ensure that the + connection is ready to use after this call. + """ + + loop = asyncio.get_event_loop() + await loop.create_unix_connection(self._protocol, self.addr) + + # wait for initial capabilities and version + init = await self.initq.get() + print(init) + + # negotioate capabilities + cmd = '{ "execute": "qmp_capabilities" }' + self.transport.write(bytes(cmd, "utf-8")) + await self._wait_check_return() + + return self + + def close(self): + """Close this connection""" + + if self.transport is not None: + self.transport.close() + self.transport = None + self.proto = None + + async def __aenter__(self): + await self.open() + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + + async def cont(self): + """Continue machine execution if it has been paused""" + + cmd = '{ "execute": "cont" }' + self.transport.write(bytes(cmd, "utf-8")) + await self._wait_check_return() + + async def stop(self): + """Stop/pause machine execution""" + + cmd = '{ "execute": "stop" }' + self.transport.write(bytes(cmd, "utf-8")) + await self._wait_check_return() + + async def quit(self): + """ + Quit the emulation. This causes the emulator to (non-gracefully) + shut down and close. + """ + + cmd = '{ "execute": "quit" }' + self.transport.write(bytes(cmd, "utf-8")) + await self._wait_check_return() + + +class QmpProtocol(asyncio.Protocol): + """The QMP transport protocoll implementation""" + + def __init__(self, conn): + self.conn = conn + + def connection_made(self, transport): + self.conn.transport = transport + + def connection_lost(self, exc): + self.conn.transport = None + self.conn.proto = None + + def data_received(self, data): + data = str(data, "utf-8") + decoder = json.JSONDecoder() + nows = re.compile(r"[^\s]") + + pos = 0 + while True: + match = nows.search(data, pos) + if not match: + return + + pos = match.start() + obj, pos = decoder.raw_decode(data, pos) + + if "return" in obj: + self.conn.dataq.put_nowait(obj) + elif "QMP" in obj: + self.conn.initq.put_nowait(obj) + elif "event" in obj: + pass + else: + print("qmp:", obj) + + +class DataFrame: + """ + Basic protocol unit for communication via the IOX API introduced for + external device emulation + """ + + def __init__(self, seq, cat, id, data=None): + self.seq = seq + self.cat = cat + self.id = id + self.data = data + + def bytes(self): + """Convert this protocol unit to raw bytes""" + data = self.data if self.data is not None else [] + return bytes([self.seq, self.cat, self.id, len(data)]) + bytes(data) + + def __repr__(self): + return f"{{ seq: 0x{self.seq:02x}, cat: 0x{self.cat:02x}, id: 0x{self.id:02x}, data: {self.data} }}" + + +def parse_dataframes(buf): + """Parse a variable number of DataFrames from the given byte buffer""" + + while len(buf) >= 4 and len(buf) >= 4 + buf[3]: + frame = DataFrame(buf[0], buf[1], buf[2], buf[4: 4 + buf[3]]) + buf = buf[4 + buf[3]:] + yield buf, frame + + return buf, None + + +class UsartStatusException(Exception): + """An exception returned by the USART send command""" + + def __init__(self, errn, *args, **kwargs): + Exception.__init__(self, f"USART error: {errno.errorcode[errn]}") + self.errno = errn # a UNIX error code indicating the reason + + +class Usart: + @staticmethod + async def createAsync(addr): + return Usart(addr) + """Connection to emulate a USART device for a given QEMU/At91 instance""" + + def __init__(self, addr): + self.addr = addr + self.respd = dict() + self.respc = asyncio.Condition() + self.dataq = asyncio.Queue() + self.datab = bytes() + self.transport = None + self.proto = None + self.seq = 0 + + def _protocol(self): + """The underlying transport protocol""" + + if self.proto is None: + self.proto = UsartProtocol(self) + + return self.proto + + async def open(self): + """Open this connection""" + + loop = asyncio.get_event_loop() + await loop.create_unix_connection(self._protocol, self.addr) + return self + + def close(self): + """Close this connection""" + + if self.transport is not None: + self.transport.close() + self.transport = None + self.proto = None + + async def __aenter__(self): + await self.open() + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + + def _send_new_frame(self, cat, cid, data=None): + """ + Send a DataFrame with the given parameters and auto-increase the + sequence counter. Return its sequence number. + """ + self.seq = (self.seq + 1) & 0x7F + + frame = DataFrame(self.seq, cat, cid, data) + self.transport.write(frame.bytes()) + + return frame.seq + + async def write(self, data): + """Write data (bytes) to the USART device""" + + seq = self._send_new_frame(IOX_CAT_DATA, IOX_CID_DATA_IN, data) + + async with self.respc: + while seq not in self.respd.keys(): + await self.respc.wait() + + resp = self.respd[seq] + del self.respd[seq] + + status = struct.unpack("I", resp.data)[0] + if status != 0: + raise UsartStatusException(status) + + async def readAsync(self, n, timemout=1): + """Wait for 'n' bytes to be received from the USART + timeout in seconds""" + end_time = time.time()+timemout + + while len(self.datab) < n and time.time() < end_time: + frame = await self.dataq.get() + self.datab += frame.data + + data, self.datab = self.datab[:n], self.datab[n:] + return data + + def read(self, n): + """Wait for 'n' bytes to be received from the USART + timeout in seconds""" + + try: + while len(self.datab) < n: + frame = self.dataq.get_nowait() + self.datab += frame.data + # todo better solution + finally: + data, self.datab = self.datab[:n], self.datab[n:] + return data + + def newDataAvailable(self) -> bool: + return not self.dataq.empty() + + def flush(self): + while True: + try: + self.dataq.get_nowait() + except Exception as e: + print(e) + return + + def inject_overrun_error(self): + """Inject an overrun error (set CSR_OVRE)""" + self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_OVRE) + + def inject_frame_error(self): + """Inject a frame error (set CSR_FRAME)""" + self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_FRAME) + + def inject_parity_error(self): + """Inject a parity error (set CSR_PARE)""" + self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_PARE) + + def inject_timeout_error(self): + """Inject a timeout (set CSR_TIMEOUT)""" + self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_TIMEOUT) + + +class UsartProtocol(asyncio.Protocol): + """The USART transport protocoll implementation""" + + def __init__(self, conn): + self.conn = conn + self.buf = bytes() + + def connection_made(self, transport): + self.conn.transport = transport + + def connection_lost(self, exc): + self.conn.transport = None + self.conn.proto = None + + def data_received(self, data): + self.buf += data + + for buf, frame in parse_dataframes(self.buf): + self.buf = buf + + if frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_OUT: + # data from CPU/board to device + self.conn.dataq.put_nowait(frame) + elif frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_IN: + # response for data from device to CPU/board + loop = asyncio.get_event_loop() + loop.create_task(self._data_response_received(frame)) + + async def _data_response_received(self, frame): + async with self.conn.respc: + self.conn.respd[frame.seq] = frame + self.conn.respc.notify_all() + + +async def main(): + # Instantiate the connection classes we need. The connections are not + # opened automatically. Use '.open()' or 'async with machine as m'. + usart = Usart(QEMU_ADDR_AT91_USART0) + + await usart.open() + + while True: + command = PUSTelecommand(17, 1) + await usart.write(command.pack()) + usart.inject_timeout_error() + + response = await usart.read(68) + + test = PUSTelemetryFactory(response) + print(test.packTmInformation()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/config/obsw_config.py b/config/obsw_config.py index 6a3b434d7977687031c1ef131295de7ed24287b8..8a675a103b028d149362f4f1dde627c10498ea24 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -30,7 +30,8 @@ class ModeList(enum.Enum): class ComIF(enum.Enum): Ethernet = 0 Serial = 1 - Dummy = 2 + QEMU = 2 + Dummy = 3 """ @@ -135,6 +136,8 @@ def set_globals(args): G_COM_IF = ComIF.Ethernet elif args.com_if == 1: G_COM_IF = ComIF.Serial + elif args.com_if == 2: + G_COM_IF = ComIF.QEMU else: G_COM_IF = ComIF.Dummy diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index d5b3bb7b89553cef9cc7f5ead68aab94e01beac7..73756432a4304c9d7c5550fd47135c0eb2965dc0 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -54,7 +54,7 @@ def parse_input_arguments(): arg_parser.print_help() args, unknown = arg_parser.parse_known_args() for argument in vars(args): - # logger.debug(argument + ": " + str(getattr(args, argument))) + LOGGER.debug(argument + ": " + str(getattr(args, argument))) pass handle_args(args, unknown) return args