From d2fa8fd56e4d70d2b0ccccb5b1061a46bf3aa733 Mon Sep 17 00:00:00 2001
From: "Robin.Mueller" <robin.mueller.m@gmail.com>
Date: Mon, 27 Apr 2020 18:27:52 +0200
Subject: [PATCH] qemu com if added

---
 comIF/obsw_com_config.py    |   9 +
 comIF/obsw_qemu_com_if.py   | 523 ++++++++++++++++++++++++++++++++++++
 config/obsw_config.py       |   5 +-
 utility/obsw_args_parser.py |   2 +-
 4 files changed, 537 insertions(+), 2 deletions(-)
 create mode 100755 comIF/obsw_qemu_com_if.py

diff --git a/comIF/obsw_com_config.py b/comIF/obsw_com_config.py
index 8e6e08e..a267d48 100644
--- a/comIF/obsw_com_config.py
+++ b/comIF/obsw_com_config.py
@@ -7,6 +7,7 @@ from comIF.obsw_com_interface import CommunicationInterface
 from comIF.obsw_dummy_com_if import DummyComIF
 from comIF.obsw_ethernet_com_if import EthernetComIF
 from comIF.obsw_serial_com_if import SerialComIF
+from comIF.obsw_qemu_com_if import QEMUComIF
 
 from utility.obsw_logger import get_logger
 from utility.obsw_tmtc_printer import TmTcPrinter
@@ -21,6 +22,8 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter
     :param tmtc_printer: TmTcPrinter object.
     :return: CommunicationInterface object
     """
+    print("test")
+    print(g.G_COM_IF)
     try:
         if g.G_COM_IF == g.ComIF.Ethernet:
             communication_interface = EthernetComIF(
@@ -34,6 +37,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter
             communication_interface = SerialComIF(
                 tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate,
                 serial_timeout=g.G_SERIAL_TIMEOUT)
+        elif g.G_COM_IF == g.ComIF.QEMU:
+            print("hallo")
+            communication_interface = QEMUComIF(
+                tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT,
+                tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR)
+
         else:
             communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
         return communication_interface
diff --git a/comIF/obsw_qemu_com_if.py b/comIF/obsw_qemu_com_if.py
new file mode 100755
index 0000000..50c4287
--- /dev/null
+++ b/comIF/obsw_qemu_com_if.py
@@ -0,0 +1,523 @@
+#!/usr/bin/env python
+
+"""
+Example framework for testing the USARTTestTask of the OBSW.
+
+This file is intended to showcase how a USART device would be emulated in
+python and connected to the QEMU emulator. The datastructures and functions in
+this file should indicate what a simulation framework for the QEMU emulator
+might look like.
+
+Requirements:
+  Python >= 3.7 (asyncio support)
+
+Instructions:
+  Run QEMU (modified for OBSW) via
+
+  qemu-system-arm -M isis-obc -monitor stdio \
+      -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin \
+      -qmp unix:/tmp/qemu,server -S
+
+  Then run this script.
+"""
+
+
+import asyncio
+import struct
+import json
+import re
+import errno
+from comIF.obsw_com_interface import CommunicationInterface
+import time
+from tm.obsw_pus_tm_factory import PusTelemetryFactory
+from threading import Thread
+# Paths to Unix Domain Sockets used by the emulator
+QEMU_ADDR_QMP = "/tmp/qemu"
+QEMU_ADDR_AT91_USART0 = "/tmp/qemu_at91_usart0"
+QEMU_ADDR_AT91_USART2 = "/tmp/qemu_at91_usart2"
+
+# Request/response category and command IDs
+IOX_CAT_DATA = 0x01
+IOX_CAT_FAULT = 0x02
+
+IOX_CID_DATA_IN = 0x01
+IOX_CID_DATA_OUT = 0x02
+
+IOX_CID_FAULT_OVRE = 0x01
+IOX_CID_FAULT_FRAME = 0x02
+IOX_CID_FAULT_PARE = 0x03
+IOX_CID_FAULT_TIMEOUT = 0x04
+
+
+def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
+    asyncio.set_event_loop(loop)
+    loop.run_forever()
+
+
+class QEMUComIF(CommunicationInterface):
+    def __init__(self, tmtcPrinter, tmTimeout, tcTimeoutFactor):
+        super().__init__(tmtcPrinter)
+        self.tmTimeout = tmTimeout
+        self.tcTimeoutFactor = tcTimeoutFactor
+        self.loop = asyncio.get_event_loop()
+        self.number_of_packets = 0
+        if(not self.loop.is_running()):
+            self.t = Thread(target=start_background_loop,
+                            args=(self.loop,), daemon=True)
+            self.t.start()
+
+        self.usart = asyncio.run_coroutine_threadsafe(
+            Usart.createAsync(QEMU_ADDR_AT91_USART0), self.loop).result()
+        asyncio.run_coroutine_threadsafe(self.usart.open(), self.loop).result()
+
+    # Send Telecommand
+    def close(self):
+        self.usart.close()
+
+    def send_telecommand(self, tcPacket, tcPacketInfo=""):
+        asyncio.run_coroutine_threadsafe(
+            self.sendTelecommandAsync(tcPacket, tcPacketInfo), self.loop).result()
+
+    async def sendTelecommandAsync(self, tcPacket, tcPacketInfo):
+        self.tmtc_printer.print_telecommand(tcPacket, tcPacketInfo)
+        await self.usart.write(tcPacket)
+        self.usart.inject_timeout_error()
+
+    def receive_telemetry(self, parameters=0):
+        packetList = self.poll_interface()
+        return packetList
+
+    def poll_interface(self, parameter=0):
+        if self.data_available():
+            pusDataList, numberOfPackets = self.pollPusPackets()
+            packetList = []
+            for counter in range(0, numberOfPackets):
+                packet = PusTelemetryFactory.create(pusDataList[counter])
+                # self.tmtc_printer.print_telemetry(packet)
+                packetList.append(packet)
+            return packetList
+
+    def data_available(self, timeout=0):
+        if self.usart.newDataAvailable():
+            return True
+        elif timeout > 0:
+            start_time = time.time()
+            elapsed_time = 0
+            while elapsed_time < timeout:
+                if self.usart.newDataAvailable():
+                    return True
+                elapsed_time = time.time() - start_time
+            return False
+
+    def pollPusPackets(self):
+        packets = self.pollPusPacketsAsync()
+        return packets
+
+    def pollPusPacketsAsync(self):
+        pus_data_list = []
+        self.data = self.usart.read(256)
+        payload_length = (self.data[4] << 8 | self.data[5])
+        packet_size = payload_length + 7
+        if payload_length == 0:
+            return [], 0
+        read_size = len(self.data)
+        self.number_of_packets = 1
+        pus_data = self.data[0:packet_size]
+        pus_data_list.append(pus_data)
+        self.__handle_multiple_packets(packet_size, read_size, pus_data_list)
+        return pus_data_list, self.number_of_packets
+
+    def __handle_multiple_packets(self, packet_size, read_size, pus_data_list):
+        end_of_buffer = read_size - 1
+        end_index = packet_size
+        while end_index < end_of_buffer:
+            end_index = self.__parse_next_packets(end_index, pus_data_list)
+
+    def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int:
+        start_index = end_index
+        end_index = end_index + 5
+        next_payload_len = (self.data[end_index - 1] << 8 | self.data[end_index])
+        next_packet_size = next_payload_len + 7
+        if next_packet_size > 256:
+            print("PUS Polling: Very large packet detected, "
+                         "large packet reading not implemented yet !")
+            print("Detected Size: " + str(next_packet_size))
+            return end_index
+        elif next_payload_len == 0:
+            end_index = 256
+            return end_index
+        end_index = start_index + next_packet_size
+        pus_data = self.data[start_index:end_index]
+        pus_data_list.append(pus_data)
+        self.number_of_packets = self.number_of_packets + 1
+        return end_index
+
+    def performListenerMode(self):
+        print("Listening for packages ...")
+        while True:
+            self.pollInterface()
+
+    def receiveTelemetryAndStoreIntoQueue(self, tmQueue):
+        packet = self.pollInterface()
+        tmQueue.put(packet)
+
+    def receiveTelemetryAndStoreTuple(self, tmTupleQueue):
+        packet = self.pollInterface()
+        tmInfo = packet.packTmInformation()
+        tmTupleQueue.put(packet, tmInfo)
+
+
+class QmpException(Exception):
+    """An exception caused by the QML/QEMU as response to a failed command"""
+
+    def __init__(self, ret, *args, **kwargs):
+        Exception.__init__(self, f"QMP error: {ret}")
+        self.ret = ret  # the 'return' structure provided by QEMU/QML
+
+
+class QmpConnection:
+    """A connection to a QEMU machine via QMP"""
+
+    def __init__(self, addr=QEMU_ADDR_QMP):
+        self.transport = None
+        self.addr = addr
+        self.dataq = asyncio.Queue()
+        self.initq = asyncio.Queue()
+        self.proto = None
+
+    def _protocol(self):
+        """The underlying transport protocol"""
+
+        if self.proto is None:
+            self.proto = QmpProtocol(self)
+
+        return self.proto
+
+    async def _wait_check_return(self):
+        """
+        Wait for the status return of a command and raise an exception if it
+        indicates a failure
+        """
+
+        resp = await self.dataq.get()
+        if resp["return"]:
+            raise QmpException(resp["return"])
+
+    async def open(self):
+        """
+        Open this connection. Connect to the machine ensure that the
+        connection is ready to use after this call.
+        """
+
+        loop = asyncio.get_event_loop()
+        await loop.create_unix_connection(self._protocol, self.addr)
+
+        # wait for initial capabilities and version
+        init = await self.initq.get()
+        print(init)
+
+        # negotioate capabilities
+        cmd = '{ "execute": "qmp_capabilities" }'
+        self.transport.write(bytes(cmd, "utf-8"))
+        await self._wait_check_return()
+
+        return self
+
+    def close(self):
+        """Close this connection"""
+
+        if self.transport is not None:
+            self.transport.close()
+            self.transport = None
+            self.proto = None
+
+    async def __aenter__(self):
+        await self.open()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        self.close()
+
+    async def cont(self):
+        """Continue machine execution if it has been paused"""
+
+        cmd = '{ "execute": "cont" }'
+        self.transport.write(bytes(cmd, "utf-8"))
+        await self._wait_check_return()
+
+    async def stop(self):
+        """Stop/pause machine execution"""
+
+        cmd = '{ "execute": "stop" }'
+        self.transport.write(bytes(cmd, "utf-8"))
+        await self._wait_check_return()
+
+    async def quit(self):
+        """
+        Quit the emulation. This causes the emulator to (non-gracefully)
+        shut down and close.
+        """
+
+        cmd = '{ "execute": "quit" }'
+        self.transport.write(bytes(cmd, "utf-8"))
+        await self._wait_check_return()
+
+
+class QmpProtocol(asyncio.Protocol):
+    """The QMP transport protocoll implementation"""
+
+    def __init__(self, conn):
+        self.conn = conn
+
+    def connection_made(self, transport):
+        self.conn.transport = transport
+
+    def connection_lost(self, exc):
+        self.conn.transport = None
+        self.conn.proto = None
+
+    def data_received(self, data):
+        data = str(data, "utf-8")
+        decoder = json.JSONDecoder()
+        nows = re.compile(r"[^\s]")
+
+        pos = 0
+        while True:
+            match = nows.search(data, pos)
+            if not match:
+                return
+
+            pos = match.start()
+            obj, pos = decoder.raw_decode(data, pos)
+
+            if "return" in obj:
+                self.conn.dataq.put_nowait(obj)
+            elif "QMP" in obj:
+                self.conn.initq.put_nowait(obj)
+            elif "event" in obj:
+                pass
+            else:
+                print("qmp:", obj)
+
+
+class DataFrame:
+    """
+    Basic protocol unit for communication via the IOX API introduced for
+    external device emulation
+    """
+
+    def __init__(self, seq, cat, id, data=None):
+        self.seq = seq
+        self.cat = cat
+        self.id = id
+        self.data = data
+
+    def bytes(self):
+        """Convert this protocol unit to raw bytes"""
+        data = self.data if self.data is not None else []
+        return bytes([self.seq, self.cat, self.id, len(data)]) + bytes(data)
+
+    def __repr__(self):
+        return f"{{ seq: 0x{self.seq:02x}, cat: 0x{self.cat:02x}, id: 0x{self.id:02x}, data: {self.data} }}"
+
+
+def parse_dataframes(buf):
+    """Parse a variable number of DataFrames from the given byte buffer"""
+
+    while len(buf) >= 4 and len(buf) >= 4 + buf[3]:
+        frame = DataFrame(buf[0], buf[1], buf[2], buf[4: 4 + buf[3]])
+        buf = buf[4 + buf[3]:]
+        yield buf, frame
+
+    return buf, None
+
+
+class UsartStatusException(Exception):
+    """An exception returned by the USART send command"""
+
+    def __init__(self, errn, *args, **kwargs):
+        Exception.__init__(self, f"USART error: {errno.errorcode[errn]}")
+        self.errno = errn  # a UNIX error code indicating the reason
+
+
+class Usart:
+    @staticmethod
+    async def createAsync(addr):
+        return Usart(addr)
+    """Connection to emulate a USART device for a given QEMU/At91 instance"""
+
+    def __init__(self, addr):
+        self.addr = addr
+        self.respd = dict()
+        self.respc = asyncio.Condition()
+        self.dataq = asyncio.Queue()
+        self.datab = bytes()
+        self.transport = None
+        self.proto = None
+        self.seq = 0
+
+    def _protocol(self):
+        """The underlying transport protocol"""
+
+        if self.proto is None:
+            self.proto = UsartProtocol(self)
+
+        return self.proto
+
+    async def open(self):
+        """Open this connection"""
+
+        loop = asyncio.get_event_loop()
+        await loop.create_unix_connection(self._protocol, self.addr)
+        return self
+
+    def close(self):
+        """Close this connection"""
+
+        if self.transport is not None:
+            self.transport.close()
+            self.transport = None
+            self.proto = None
+
+    async def __aenter__(self):
+        await self.open()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        self.close()
+
+    def _send_new_frame(self, cat, cid, data=None):
+        """
+        Send a DataFrame with the given parameters and auto-increase the
+        sequence counter. Return its sequence number.
+        """
+        self.seq = (self.seq + 1) & 0x7F
+
+        frame = DataFrame(self.seq, cat, cid, data)
+        self.transport.write(frame.bytes())
+
+        return frame.seq
+
+    async def write(self, data):
+        """Write data (bytes) to the USART device"""
+
+        seq = self._send_new_frame(IOX_CAT_DATA, IOX_CID_DATA_IN, data)
+
+        async with self.respc:
+            while seq not in self.respd.keys():
+                await self.respc.wait()
+
+            resp = self.respd[seq]
+            del self.respd[seq]
+
+        status = struct.unpack("I", resp.data)[0]
+        if status != 0:
+            raise UsartStatusException(status)
+
+    async def readAsync(self, n, timemout=1):
+        """Wait for 'n' bytes to be received from the USART
+            timeout in seconds"""
+        end_time = time.time()+timemout
+
+        while len(self.datab) < n and time.time() < end_time:
+            frame = await self.dataq.get()
+            self.datab += frame.data
+
+        data, self.datab = self.datab[:n], self.datab[n:]
+        return data
+
+    def read(self, n):
+        """Wait for 'n' bytes to be received from the USART
+            timeout in seconds"""
+
+        try:
+            while len(self.datab) < n:
+                frame = self.dataq.get_nowait()
+                self.datab += frame.data
+        # todo better solution
+        finally:
+            data, self.datab = self.datab[:n], self.datab[n:]
+            return data
+
+    def newDataAvailable(self) -> bool:
+        return not self.dataq.empty()
+
+    def flush(self):
+        while True:
+            try:
+                self.dataq.get_nowait()
+            except Exception as e:
+                print(e)
+                return
+
+    def inject_overrun_error(self):
+        """Inject an overrun error (set CSR_OVRE)"""
+        self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_OVRE)
+
+    def inject_frame_error(self):
+        """Inject a frame error (set CSR_FRAME)"""
+        self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_FRAME)
+
+    def inject_parity_error(self):
+        """Inject a parity error (set CSR_PARE)"""
+        self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_PARE)
+
+    def inject_timeout_error(self):
+        """Inject a timeout (set CSR_TIMEOUT)"""
+        self._send_new_frame(IOX_CAT_FAULT, IOX_CID_FAULT_TIMEOUT)
+
+
+class UsartProtocol(asyncio.Protocol):
+    """The USART transport protocoll implementation"""
+
+    def __init__(self, conn):
+        self.conn = conn
+        self.buf = bytes()
+
+    def connection_made(self, transport):
+        self.conn.transport = transport
+
+    def connection_lost(self, exc):
+        self.conn.transport = None
+        self.conn.proto = None
+
+    def data_received(self, data):
+        self.buf += data
+
+        for buf, frame in parse_dataframes(self.buf):
+            self.buf = buf
+
+            if frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_OUT:
+                # data from CPU/board to device
+                self.conn.dataq.put_nowait(frame)
+            elif frame.cat == IOX_CAT_DATA and frame.id == IOX_CID_DATA_IN:
+                # response for data from device to CPU/board
+                loop = asyncio.get_event_loop()
+                loop.create_task(self._data_response_received(frame))
+
+    async def _data_response_received(self, frame):
+        async with self.conn.respc:
+            self.conn.respd[frame.seq] = frame
+            self.conn.respc.notify_all()
+
+
+async def main():
+    # Instantiate the connection classes we need. The connections are not
+    # opened automatically. Use '.open()' or 'async with machine as m'.
+    usart = Usart(QEMU_ADDR_AT91_USART0)
+
+    await usart.open()
+
+    while True:
+        command = PUSTelecommand(17, 1)
+        await usart.write(command.pack())
+        usart.inject_timeout_error()
+
+        response = await usart.read(68)
+
+        test = PUSTelemetryFactory(response)
+        print(test.packTmInformation())
+
+
+if __name__ == "__main__":
+    asyncio.run(main())
diff --git a/config/obsw_config.py b/config/obsw_config.py
index 6a3b434..8a675a1 100644
--- a/config/obsw_config.py
+++ b/config/obsw_config.py
@@ -30,7 +30,8 @@ class ModeList(enum.Enum):
 class ComIF(enum.Enum):
     Ethernet = 0
     Serial = 1
-    Dummy = 2
+    QEMU = 2
+    Dummy = 3
 
 
 """
@@ -135,6 +136,8 @@ def set_globals(args):
         G_COM_IF = ComIF.Ethernet
     elif args.com_if == 1:
         G_COM_IF = ComIF.Serial
+    elif args.com_if == 2:
+        G_COM_IF = ComIF.QEMU
     else:
         G_COM_IF = ComIF.Dummy
 
diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py
index d5b3bb7..7375643 100644
--- a/utility/obsw_args_parser.py
+++ b/utility/obsw_args_parser.py
@@ -54,7 +54,7 @@ def parse_input_arguments():
         arg_parser.print_help()
     args, unknown = arg_parser.parse_known_args()
     for argument in vars(args):
-        # logger.debug(argument + ": " + str(getattr(args, argument)))
+        LOGGER.debug(argument + ": " + str(getattr(args, argument)))
         pass
     handle_args(args, unknown)
     return args
-- 
GitLab