Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
source
tcpip-to-serial-agent
Commits
51cecc0b
Unverified
Commit
51cecc0b
authored
Mar 29, 2022
by
Robin Mueller
Browse files
apply black afmt and added additional warning
parent
bfe20482
Changes
10
Hide whitespace changes
Inline
Side-by-side
agent/buffer.py
View file @
51cecc0b
...
...
@@ -23,7 +23,7 @@ def set_max_slots(max_slots: int):
def
init_reception_buffer
():
if
not
__MAX_SLOTS_SET
:
logger
=
get_console_logger
()
logger
.
warning
(
'
Maximum number of slots was not set with set_max_slots
'
)
logger
.
warning
(
"
Maximum number of slots was not set with set_max_slots
"
)
get_reception_buffer
()
...
...
agent/conf.py
View file @
51cecc0b
from
os.path
import
expanduser
JSON_CONF_DIR
=
expanduser
(
"~"
)
JSON_CONF_NAME
=
'
.tmtc-agent.json
'
JSON_CONF_FILE
=
f
'
{
JSON_CONF_DIR
}
/
{
JSON_CONF_NAME
}
'
JSON_CONF_NAME
=
"
.tmtc-agent.json
"
JSON_CONF_FILE
=
f
"
{
JSON_CONF_DIR
}
/
{
JSON_CONF_NAME
}
"
MAX_DLE_FRAME_LEN
=
1500
DEFAULT_BAUDRATE
=
230400
DEFAULT_TCP_PORT
=
7301
...
...
agent/dle_encoder.py
View file @
51cecc0b
...
...
@@ -16,12 +16,12 @@ DLE_CHAR = 0x10
class
DleErrorCodes
(
enum
.
Enum
):
OK
=
0
,
OK
=
(
0
,
)
DECODING_ERROR
=
1
def
encode_dle
(
source_packet
:
bytearray
,
add_stx_etx
:
bool
=
True
,
encode_cr
:
bool
=
False
source_packet
:
bytearray
,
add_stx_etx
:
bool
=
True
,
encode_cr
:
bool
=
False
)
->
bytearray
:
"""Encodes a given stream with DLE encoding.
:return: Returns a tuple with 2 values.
...
...
@@ -37,8 +37,9 @@ def encode_dle(
while
source_index
<
source_len
:
next_byte
=
source_packet
[
source_index
]
# STX, ETX and CR characters in the stream need to be escaped with DLE
if
(
next_byte
==
STX_CHAR
or
next_byte
==
ETX_CHAR
)
or
\
(
encode_cr
and
next_byte
==
CARRIAGE_RETURN
):
if
(
next_byte
==
STX_CHAR
or
next_byte
==
ETX_CHAR
)
or
(
encode_cr
and
next_byte
==
CARRIAGE_RETURN
):
dest_stream
.
append
(
DLE_CHAR
)
"""Escaped byte will be actual byte + 0x40. This prevents
STX and ETX characters from appearing
...
...
@@ -61,7 +62,7 @@ def encode_dle(
def
decode_dle
(
source_packet
:
bytearray
,
decode_cr
:
bool
=
False
source_packet
:
bytearray
,
decode_cr
:
bool
=
False
)
->
Tuple
[
DleErrorCodes
,
bytearray
,
int
]:
"""Decodes a given DLE encoded data stream. This call only returns the first packet found.
...
...
@@ -82,14 +83,19 @@ def decode_dle(
return
DleErrorCodes
.
DECODING_ERROR
,
dest_stream
,
0
encoded_index
+=
1
while
encoded_index
<
source_len
and
source_packet
[
encoded_index
]
!=
ETX_CHAR
\
and
source_packet
[
encoded_index
]
!=
STX_CHAR
:
while
(
encoded_index
<
source_len
and
source_packet
[
encoded_index
]
!=
ETX_CHAR
and
source_packet
[
encoded_index
]
!=
STX_CHAR
):
if
source_packet
[
encoded_index
]
==
DLE_CHAR
:
next_byte
=
source_packet
[
encoded_index
+
1
]
if
next_byte
==
DLE_CHAR
:
dest_stream
.
append
(
next_byte
)
else
:
if
(
next_byte
==
0x42
or
next_byte
==
0x43
)
or
(
decode_cr
and
next_byte
==
0x4D
):
if
(
next_byte
==
0x42
or
next_byte
==
0x43
)
or
(
decode_cr
and
next_byte
==
0x4D
):
dest_stream
.
append
(
next_byte
-
0x40
)
else
:
return
DleErrorCodes
.
DECODING_ERROR
,
dest_stream
,
encoded_index
...
...
agent/exit_handler.py
View file @
51cecc0b
...
...
@@ -8,7 +8,7 @@ __KILL_EVENT = threading.Event()
def
keyboard_interrupt_handler
():
logger
=
get_console_logger
()
logger
.
info
(
'
Closing TCP/IP to Serial Agent
'
)
logger
.
info
(
"
Closing TCP/IP to Serial Agent
"
)
class
ServiceExit
(
Exception
):
...
...
@@ -16,12 +16,13 @@ class ServiceExit(Exception):
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass
def
service_shutdown
(
signum
,
frame
):
logger
=
get_console_logger
()
logger
.
warning
(
f
'
Caught signal
{
signum
}
'
)
logger
.
warning
(
f
"
Caught signal
{
signum
}
"
)
raise
ServiceExit
...
...
agent/json.py
View file @
51cecc0b
...
...
@@ -7,11 +7,11 @@ from agent.conf import JSON_CONF_FILE
class
JsonKeyNames
(
enum
.
Enum
):
TCP_PORT
=
'
tcp_port
'
SERIAL_BAUDRATE
=
'
serial_baudrate
'
SERIAL_PORT
=
'
serial_port
'
SERIAL_HINT
=
'
serial_hint
'
DEBUG_PACKETS
=
'
debug_packets
'
TCP_PORT
=
"
tcp_port
"
SERIAL_BAUDRATE
=
"
serial_baudrate
"
SERIAL_PORT
=
"
serial_port
"
SERIAL_HINT
=
"
serial_hint
"
DEBUG_PACKETS
=
"
debug_packets
"
def
check_json_file
(
json_cfg_path
:
str
)
->
bool
:
...
...
@@ -20,20 +20,24 @@ def check_json_file(json_cfg_path: str) -> bool:
:return: True if JSON file is valid, False if not and a new one was created at the specified path
"""
logger
=
get_console_logger
()
if
json_cfg_path
==
''
:
if
json_cfg_path
==
""
:
json_cfg_path
=
JSON_CONF_FILE
if
not
os
.
path
.
isfile
(
json_cfg_path
):
with
open
(
json_cfg_path
,
'w'
)
as
file
:
with
open
(
json_cfg_path
,
"w"
)
as
file
:
load_data
=
dict
()
json
.
dump
(
load_data
,
file
)
print
(
f
'Configuration JSON
{
json_cfg_path
}
did not exist, created a new one.'
)
print
(
f
"Configuration JSON
{
json_cfg_path
}
did not exist, created a new one."
)
return
False
else
:
with
open
(
json_cfg_path
,
"r+"
)
as
file
:
try
:
json
.
load
(
file
)
except
json
.
decoder
.
JSONDecodeError
:
logger
.
warning
(
'JSON decode error, file format might be invalid. Replacing JSON'
)
logger
.
warning
(
"JSON decode error, file format might be invalid. Replacing JSON"
)
file
.
flush
()
file
.
truncate
(
0
)
file
.
seek
(
0
)
...
...
@@ -44,16 +48,16 @@ def check_json_file(json_cfg_path: str) -> bool:
def
save_to_json_with_prompt
(
key
:
str
,
value
:
str
,
name
:
str
,
json_cfg_path
:
str
,
json_obj
:
any
key
:
str
,
value
:
str
,
name
:
str
,
json_cfg_path
:
str
,
json_obj
:
any
)
->
bool
:
logger
=
get_console_logger
()
save_to_json
=
input
(
f
'
Do you want to store the
{
name
}
to the configuration file? (y/n):
'
f
"
Do you want to store the
{
name
}
to the configuration file? (y/n):
"
)
if
save_to_json
.
lower
()
in
[
'y'
,
'
yes
'
]:
if
save_to_json
.
lower
()
in
[
"y"
,
"
yes
"
]:
json_obj
[
key
]
=
value
logger
.
info
(
f
'
The
{
name
}
was stored to the JSON file
{
json_cfg_path
}
'
)
logger
.
info
(
'
Delete this file or edit it manually to change it
'
)
logger
.
info
(
f
"
The
{
name
}
was stored to the JSON file
{
json_cfg_path
}
"
)
logger
.
info
(
"
Delete this file or edit it manually to change it
"
)
return
True
return
False
...
...
@@ -62,28 +66,30 @@ def determine_debug_packets(json_cfg_path: str) -> bool:
"""Determine whether debug printout is shown.
:return: Determined serial port
"""
with
open
(
json_cfg_path
,
'
r+
'
)
as
json_file
:
with
open
(
json_cfg_path
,
"
r+
"
)
as
json_file
:
json_obj
=
json
.
load
(
json_file
)
try
:
debug_packets
=
json_obj
[
JsonKeyNames
.
DEBUG_PACKETS
.
value
]
if
debug_packets
==
'
True
'
or
debug_packets
==
'1'
:
if
debug_packets
==
"
True
"
or
debug_packets
==
"1"
:
debug_packets
=
True
else
:
debug_packets
=
False
except
KeyError
:
debug_packets
=
input
(
'
Do you want to display sent and received packets? [y/n]:
'
"
Do you want to display sent and received packets? [y/n]:
"
)
if
debug_packets
is
[
'y'
,
'
yes
'
,
'1'
]:
if
debug_packets
is
[
"y"
,
"
yes
"
,
"1"
]:
debug_packets
=
True
else
:
debug_packets
=
False
cfg_stored
=
save_to_json_with_prompt
(
key
=
JsonKeyNames
.
DEBUG_PACKETS
.
value
,
value
=
str
(
debug_packets
),
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
,
name
=
'debug packets flag'
key
=
JsonKeyNames
.
DEBUG_PACKETS
.
value
,
value
=
str
(
debug_packets
),
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
,
name
=
"debug packets flag"
,
)
if
cfg_stored
:
json_file
.
seek
(
0
)
json
.
dump
(
json_obj
,
json_file
,
indent
=
4
)
return
debug_packets
agent/log.py
View file @
51cecc0b
import
colorlog
import
logging
CONSOLE_LOGGER_NAME
=
'
TCP/IP to Serial Logger
'
CONSOLE_LOGGER_NAME
=
"
TCP/IP to Serial Logger
"
__LOGGER
=
None
def
set_up_logger
()
->
logging
.
Logger
:
normal_format
=
colorlog
.
ColoredFormatter
(
fmt
=
'
%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d %(reset)s%(message)s
'
,
datefmt
=
'
%Y-%m-%d %H:%M:%S
'
fmt
=
"
%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d %(reset)s%(message)s
"
,
datefmt
=
"
%Y-%m-%d %H:%M:%S
"
,
)
normal_handler
=
colorlog
.
StreamHandler
()
normal_handler
.
setFormatter
(
normal_format
)
normal_handler
.
addFilter
(
DebugInfoFilter
())
error_warning_format
=
colorlog
.
ColoredFormatter
(
fmt
=
'
%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d
'
'
[%(filename)s:%(lineno)d] %(reset)s%(message)s
'
,
datefmt
=
'
%Y-%m-%d %H:%M:%S
'
fmt
=
"
%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d
"
"
[%(filename)s:%(lineno)d] %(reset)s%(message)s
"
,
datefmt
=
"
%Y-%m-%d %H:%M:%S
"
,
)
fault_handler
=
colorlog
.
StreamHandler
()
fault_handler
.
setFormatter
(
error_warning_format
)
...
...
@@ -38,8 +38,8 @@ def get_console_logger() -> logging.Logger:
class
DebugInfoFilter
(
logging
.
Filter
):
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout.
"""
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout.
"""
def
filter
(
self
,
rec
):
if
rec
.
levelno
==
logging
.
INFO
or
rec
.
levelno
==
logging
.
DEBUG
:
return
rec
.
levelno
...
...
agent/serial_handler.py
View file @
51cecc0b
...
...
@@ -27,7 +27,9 @@ class SerialHandler:
self
.
encode_cr
=
encode_cr
def
send
(
self
,
data
:
bytearray
):
encoded_data
=
encode_dle
(
source_packet
=
data
,
add_stx_etx
=
True
,
encode_cr
=
self
.
encode_cr
)
encoded_data
=
encode_dle
(
source_packet
=
data
,
add_stx_etx
=
True
,
encode_cr
=
self
.
encode_cr
)
if
self
.
serial
is
not
None
:
self
.
serial
.
write
(
encoded_data
)
...
...
@@ -39,22 +41,22 @@ class SerialHandler:
self
.
_stop_signal
.
set
()
def
run
(
self
):
if
self
.
serial_port
==
''
:
LOGGER
.
warning
(
'
Invalid serial port, is empty
'
)
if
self
.
serial_port
==
""
:
LOGGER
.
warning
(
"
Invalid serial port, is empty
"
)
self_destruct
()
sys
.
exit
(
1
)
LOGGER
.
info
(
f
'
Starting serial polling task for serial port
{
self
.
serial_port
}
'
)
LOGGER
.
info
(
f
"
Starting serial polling task for serial port
{
self
.
serial_port
}
"
)
try
:
self
.
serial
=
serial
.
Serial
(
port
=
self
.
serial_port
,
baudrate
=
self
.
baud_rate
)
self
.
serial
=
serial
.
Serial
(
port
=
self
.
serial_port
,
baudrate
=
self
.
baud_rate
)
except
serial
.
SerialException
as
e
:
LOGGER
.
exception
(
'Serial port opening failure! Port might already be used..'
)
LOGGER
.
exception
(
"Serial port opening failure! Port might already be used.."
)
self_destruct
()
sys
.
exit
(
1
)
while
True
:
if
self
.
_stop_signal
.
is_set
():
LOGGER
.
warning
(
'
Stop event received, closing SerialHandler..
'
)
LOGGER
.
warning
(
"
Stop event received, closing SerialHandler..
"
)
break
# Poll permanently, but it is possible to join this thread every 200 ms
self
.
serial
.
timeout
=
0.2
...
...
@@ -63,20 +65,22 @@ class SerialHandler:
if
len
(
byte
)
==
1
and
byte
[
0
]
==
STX_CHAR
:
data
.
append
(
byte
[
0
])
self
.
serial
.
timeout
=
0.1
bytes_rcvd
=
self
.
serial
.
read_until
(
serial
.
to_bytes
([
ETX_CHAR
]),
MAX_DLE_FRAME_LEN
)
bytes_rcvd
=
self
.
serial
.
read_until
(
serial
.
to_bytes
([
ETX_CHAR
]),
MAX_DLE_FRAME_LEN
)
if
bytes_rcvd
[
len
(
bytes_rcvd
)
-
1
]
==
ETX_CHAR
:
data
.
extend
(
bytes_rcvd
)
with
acquire_timeout
(
lock
=
self
.
buf_lock
,
timeout
=
DEFAULT_LOCK_TIMEOUT_SECONDS
lock
=
self
.
buf_lock
,
timeout
=
DEFAULT_LOCK_TIMEOUT_SECONDS
)
as
acquired
:
if
not
acquired
:
LOGGER
.
error
(
'
Acquiring lock failed
'
)
LOGGER
.
error
(
"
Acquiring lock failed
"
)
self
.
rec_buf
.
appendleft
(
data
)
elif
len
(
byte
)
>=
1
:
data
.
append
(
byte
[
0
])
data
.
extend
(
self
.
serial
.
read
(
self
.
serial
.
inWaiting
()))
# It is assumed that all packets are DLE encoded, so throw it away for now.
LOGGER
.
info
(
f
'
Non DLE-Encoded data with length
{
len
(
data
)
+
1
}
found..
'
)
LOGGER
.
info
(
f
"
Non DLE-Encoded data with length
{
len
(
data
)
+
1
}
found..
"
)
def
determine_com_port
(
json_cfg_path
:
str
)
->
str
:
...
...
@@ -87,32 +91,39 @@ def determine_com_port(json_cfg_path: str) -> str:
reconfig_com_port
=
False
if
not
check_json_file
(
json_cfg_path
=
json_cfg_path
):
reconfig_com_port
=
True
with
open
(
json_cfg_path
,
'
r+
'
)
as
json_file
:
with
open
(
json_cfg_path
,
"
r+
"
)
as
json_file
:
com_port
=
__det_com_port_with_json_file
(
json_cfg_path
=
json_cfg_path
,
json_file
=
json_file
,
reconfig_com_port
=
reconfig_com_port
json_cfg_path
=
json_cfg_path
,
json_file
=
json_file
,
reconfig_com_port
=
reconfig_com_port
,
)
return
com_port
def
__det_com_port_with_json_file
(
json_cfg_path
:
str
,
json_file
:
TextIO
,
reconfig_com_port
:
bool
json_cfg_path
:
str
,
json_file
:
TextIO
,
reconfig_com_port
:
bool
)
->
str
:
try_hint
=
False
json_obj
=
json
.
load
(
json_file
)
com_port
=
''
com_port
=
""
if
not
reconfig_com_port
:
reconfig_com_port
,
try_hint
,
com_port
=
__try_com_port_load
(
json_obj
=
json_obj
)
if
try_hint
:
reconfig_com_port
,
com_port
=
__try_hint_handling
(
json_cfg_path
=
json_cfg_path
,
reconfig_com_port
=
reconfig_com_port
,
json_obj
=
json_obj
json_cfg_path
=
json_cfg_path
,
reconfig_com_port
=
reconfig_com_port
,
json_obj
=
json_obj
,
)
if
reconfig_com_port
:
com_port
=
prompt_com_port
()
save_to_json_with_prompt
(
key
=
JsonKeyNames
.
SERIAL_PORT
.
value
,
value
=
com_port
,
name
=
'serial port'
,
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
key
=
JsonKeyNames
.
SERIAL_PORT
.
value
,
value
=
com_port
,
name
=
"serial port"
,
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
,
)
json_file
.
seek
(
0
)
json
.
dump
(
json_obj
,
json_file
,
indent
=
4
)
...
...
@@ -122,22 +133,22 @@ def __det_com_port_with_json_file(
def
__try_com_port_load
(
json_obj
)
->
(
bool
,
bool
,
str
):
reconfig_com_port
=
False
try_hint
=
False
com_port
=
''
com_port
=
""
try
:
com_port
=
json_obj
[
JsonKeyNames
.
SERIAL_PORT
.
value
]
LOGGER
.
info
(
f
'
Loaded COM port
{
com_port
}
from JSON configuration file
'
)
LOGGER
.
info
(
f
"
Loaded COM port
{
com_port
}
from JSON configuration file
"
)
if
not
check_port_validity
(
com_port
):
while
True
:
reconfigure
=
input
(
'
COM port from configuration file not contained within serial
'
'
port list. Reconfigure serial port or try to determine from hint?
'
'
[r (reconfigure) / h (hint) / c(cancel)]:
'
"
COM port from configuration file not contained within serial
"
"
port list. Reconfigure serial port or try to determine from hint?
"
"
[r (reconfigure) / h (hint) / c(cancel)]:
"
)
if
reconfigure
.
lower
()
in
[
'r'
]:
if
reconfigure
.
lower
()
in
[
"r"
]:
reconfig_com_port
=
True
elif
reconfigure
.
lower
()
in
[
'h'
]:
elif
reconfigure
.
lower
()
in
[
"h"
]:
try_hint
=
True
elif
reconfigure
.
lower
()
in
[
'c'
]:
elif
reconfigure
.
lower
()
in
[
"c"
]:
return
com_port
else
:
continue
...
...
@@ -146,7 +157,9 @@ def __try_com_port_load(json_obj) -> (bool, bool, str):
return
reconfig_com_port
,
try_hint
,
com_port
def
__try_hint_handling
(
json_cfg_path
:
str
,
reconfig_com_port
:
bool
,
json_obj
)
->
(
bool
,
str
):
def
__try_hint_handling
(
json_cfg_path
:
str
,
reconfig_com_port
:
bool
,
json_obj
)
->
(
bool
,
str
):
reconfig_hint
=
False
try
:
hint
=
json_obj
[
JsonKeyNames
.
SERIAL_HINT
.
value
]
...
...
@@ -158,70 +171,75 @@ def __try_hint_handling(json_cfg_path: str, reconfig_com_port: bool, json_obj) -
LOGGER
.
info
(
f
'Found
{
com_port
}
based on hint "
{
hint
}
"'
)
if
reconfig_hint
:
if
save_to_json_with_prompt
(
key
=
JsonKeyNames
.
SERIAL_PORT
.
value
,
value
=
com_port
,
name
=
'serial port'
,
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
key
=
JsonKeyNames
.
SERIAL_PORT
.
value
,
value
=
com_port
,
name
=
"serial port"
,
json_cfg_path
=
json_cfg_path
,
json_obj
=
json_obj
,
):
reconfig_com_port
=
False
else
:
LOGGER
.
info
(
'
No COM port found based on hint..
'
)
LOGGER
.
info
(
"
No COM port found based on hint..
"
)
reconfig_com_port
=
True
return
reconfig_com_port
,
com_port
def
__prompt_hint_handling
(
json_obj
)
->
(
bool
,
str
):
reconfig_hint
=
False
hint
=
''
hint
=
""
ports
=
serial
.
tools
.
list_ports
.
comports
()
prompt_hint
=
input
(
'
No hint found in config JSON. Do you want to print the list of devices
'
'
and then specify a hint based on it? [y/n]:
'
"
No hint found in config JSON. Do you want to print the list of devices
"
"
and then specify a hint based on it? [y/n]:
"
)
if
prompt_hint
.
lower
()
in
[
'y'
,
'
yes
'
,
'1'
]:
LOGGER
.
info
(
'
Found serial devices:
'
)
if
prompt_hint
.
lower
()
in
[
"y"
,
"
yes
"
,
"1"
]:
LOGGER
.
info
(
"
Found serial devices:
"
)
for
port
,
desc
,
hwid
in
sorted
(
ports
):
print
(
'
{}: {} [{}]
'
.
format
(
port
,
desc
,
hwid
))
print
(
"
{}: {} [{}]
"
.
format
(
port
,
desc
,
hwid
))
while
True
:
hint
=
input
(
'
Specify hint:
'
)
hint
=
input
(
"
Specify hint:
"
)
save_to_json
=
input
(
'
Do you want to store the hint to the configuration file or
'
'
specify a new one? (y/r):
'
"
Do you want to store the hint to the configuration file or
"
"
specify a new one? (y/r):
"
)
if
save_to_json
in
[
'y'
,
'
yes
'
,
'1'
]:
if
save_to_json
in
[
"y"
,
"
yes
"
,
"1"
]:
json_obj
[
JsonKeyNames
.
SERIAL_HINT
.
value
]
=
hint
reconfig_hint
=
True
break
elif
save_to_json
in
[
'r'
]:
elif
save_to_json
in
[
"r"
]:
continue
return
reconfig_hint
,
hint
def
find_com_port_from_hint
(
hint
:
str
)
->
(
bool
,
str
):
"""Find a COM port based on a hint string"""
if
hint
==
''
:
LOGGER
.
warning
(
'
Invalid hint, is empty..
'
)
return
False
,
''
if
hint
==
""
:
LOGGER
.
warning
(
"
Invalid hint, is empty..
"
)
return
False
,
""
ports
=
serial
.
tools
.
list_ports
.
comports
()
for
port
,
desc
,
hwid
in
sorted
(
ports
):
if
hint
in
desc
:
return
True
,
port
return
False
,
''
return
False
,
""
def
prompt_com_port
()
->
str
:
while
True
:
com_port
=
input
(
'
Configuring serial port. Please enter COM Port
'
'
(enter h to display list of COM ports):
'
"
Configuring serial port. Please enter COM Port
"
"
(enter h to display list of COM ports):
"
)
if
com_port
==
'h'
:
if
com_port
==
"h"
:
ports
=
serial
.
tools
.
list_ports
.
comports
()
for
port
,
desc
,
hwid
in
sorted
(
ports
):
print
(
'
{}: {} [{}]
'
.
format
(
port
,
desc
,
hwid
))
print
(
"
{}: {} [{}]
"
.
format
(
port
,
desc
,
hwid
))
else
:
if
not
check_port_validity
(
com_port
):
print
(
'Serial port not in list of available serial ports. Try again? [y/n]'
)
print
(
"Serial port not in list of available serial ports. Try again? [y/n]"
)
try_again
=
input
()
if
try_again
.
lower
()
in
[
'y'
,
'
yes
'
]:
if
try_again
.
lower
()
in
[
"y"
,
"
yes
"
]:
continue
else
:
break
...
...
agent/tcp_handler.py
View file @
51cecc0b
...
...
@@ -24,7 +24,7 @@ class TcpHandler:
self
.
port
=
port
self
.
_stop_signal
=
threading
.
Event
()
self
.
debug_packets
=
debug_packets
self
.
server_addr
=
(
'
localhost
'
,
port
)
self
.
server_addr
=
(
"
localhost
"
,
port
)
self
.
rec_buf
=
get_reception_buffer
()
self
.
buf_lock
=
get_buffer_lock
()
...
...
@@ -43,13 +43,11 @@ class TcpHandler:
def
__tcp_loop
(
self
)
->
bool
:
if
self
.
_stop_signal
.
is_set
():
LOGGER
.
warning
(
'
Stop signal received, closing TCP handler
'
)
LOGGER
.
warning
(
"
Stop signal received, closing TCP handler
"
)
return
False
inputs
=
[
self
.
tcp_socket
]
outputs
=
[]
readable
,
writable
,
execptional
=
select
.
select
(
inputs
,
outputs
,
inputs
,
0.5
)
readable
,
writable
,
execptional
=
select
.
select
(
inputs
,
outputs
,
inputs
,
0.5
)
for
sock
in
readable
:
self
.
__handle_tcp_connection
(
sock
)
return
True
...
...
@@ -59,13 +57,13 @@ class TcpHandler:
try
:
self
.
__tcp_conn_core_handler
(
conn
,
addr
)
except
ConnectionResetError
:
LOGGER
.
exception
(
'
ConnectionResetError. TCP server might not be up
'
)
LOGGER
.
exception
(
"
ConnectionResetError. TCP server might not be up
"
)
def
__tcp_conn_core_handler
(
self
,
conn
:
socket
.
socket
,
address
:
any
):
if
address
:
pass
# Commented for now, a lot of spam..
LOGGER
.
info
(
f
'
Connection with
{
address
}
established
'
)
LOGGER
.
info
(
f
"
Connection with
{
address
}
established
"
)
tc_queue
=
deque
()
conn
.
setblocking
(
False
)
conn
.
settimeout
(
0
)
...
...
@@ -78,16 +76,20 @@ class TcpHandler:
self
.
__tcp_conn_handle_send_recv
(
conn
=
conn
,
tc_queue
=
tc_queue
)
break
else
:
LOGGER
.
warning
(
'
Unexpected recv result
'
)
LOGGER
.
warning
(
"
Unexpected recv result
"
)
break
except
BlockingIOError
:
op_performed
=
self
.
__tcp_conn_handle_send_recv
(
conn
=
conn
,
tc_queue
=
tc_queue
)
op_performed
=
self
.
__tcp_conn_handle_send_recv
(
conn
=
conn
,
tc_queue
=
tc_queue
)
if
not
op_performed
:
time
.
sleep
(
0.4
)
# All operations done, shut down connection
conn
.
shutdown
(
socket
.
SHUT_RDWR
)
def
__tcp_conn_handle_send_recv
(
self
,
conn
:
socket
.
socket
,
tc_queue
:
Deque
[
bytearray
])
->
bool
:
def
__tcp_conn_handle_send_recv
(
self
,
conn
:
socket
.
socket
,
tc_queue
:
Deque
[
bytearray
]
)
->
bool
:
tc_list
=
parse_space_packets
(
analysis_queue
=
tc_queue
,
packet_ids
=
(
PUS_TC_SPACE_PACKET_ID
,)
)
...
...
@@ -99,15 +101,16 @@ class TcpHandler:
self
.
serial_handler
.
send
(
data
=
tc_packet
)
with
acquire_timeout
(
self
.
buf_lock
,
timeout
=
0.4
)
as
acquired
:
if
not
acquired
:
LOGGER
.
error
(
'
Acquiring lock failed
'
)
LOGGER
.
error
(
"
Acquiring lock failed
"
)
while
self
.
rec_buf
:
tc_handled
=
True
tm_packet_encoded
=
self
.
rec_buf
.
pop
()
code
,
tm_packet
,
read_len
=
decode_dle
(
source_packet
=
tm_packet_encoded
,
decode_cr
=
self
.
serial_handler
.
encode_cr
source_packet
=
tm_packet_encoded
,
decode_cr
=
self
.
serial_handler
.
encode_cr
,
)
if
code
==
DleErrorCodes
.
DECODING_ERROR
:
LOGGER
.
warning
(
'
DLE decoding error occurred
'
)
LOGGER
.
warning
(
"
DLE decoding error occurred
"
)
elif
code
==
DleErrorCodes
.
OK
:
# Now sent back telemetry received from serial interface
if
self
.
debug_packets
:
...
...
@@ -117,7 +120,7 @@ class TcpHandler:
def
parse_space_packets
(
analysis_queue
:
Deque
[
bytearray
],
packet_ids
:
Tuple
[
int
]
analysis_queue
:
Deque
[
bytearray
],
packet_ids
:
Tuple
[
int
]
)
->
List
[
bytearray
]:
"""Given a deque of bytearrays, parse for space packets. Any broken headers will be removed.
If a packet is detected and the
...
...
@@ -140,12 +143,15 @@ def parse_space_packets(
while
True
:
if
current_idx
+
1
>=
len
(
concatenated_packets
):
break
current_packet_id
=
\
(
concatenated_packets
[
current_idx
]
<<
8
)
|
concatenated_packets
[
current_idx
+
1
]
current_packet_id
=
(
concatenated_packets
[
current_idx
]
<<
8
)
|
concatenated_packets
[
current_idx
+
1
]
if
current_packet_id
in
packet_ids
:
result
,
current_idx
=
__handle_packet_id_match
(
concatenated_packets
=
concatenated_packets
,
analysis_queue
=
analysis_queue
,
current_idx
=
current_idx
,
tm_list
=
tm_list
concatenated_packets
=
concatenated_packets
,