|
|
|
@ -126,6 +126,7 @@ class Context(object):
@@ -126,6 +126,7 @@ class Context(object):
|
|
|
|
|
def __init__(self): |
|
|
|
|
self.parameters = [] |
|
|
|
|
self.sitl_commandline_customised = False |
|
|
|
|
self.message_hooks = [] |
|
|
|
|
|
|
|
|
|
# https://stackoverflow.com/questions/616645/how-do-i-duplicate-sys-stdout-to-a-log-file-in-python |
|
|
|
|
class TeeBoth(object): |
|
|
|
@ -2142,6 +2143,12 @@ class AutoTest(ABC):
@@ -2142,6 +2143,12 @@ class AutoTest(ABC):
|
|
|
|
|
def install_message_hook(self, hook): |
|
|
|
|
self.mav.message_hooks.append(hook) |
|
|
|
|
|
|
|
|
|
def install_message_hook_context(self, hook): |
|
|
|
|
'''installs a message hook which will be removed when the context goes |
|
|
|
|
away''' |
|
|
|
|
self.mav.message_hooks.append(hook) |
|
|
|
|
self.context_get().message_hooks.append(hook) |
|
|
|
|
|
|
|
|
|
def remove_message_hook(self, hook): |
|
|
|
|
oldlen = len(self.mav.message_hooks) |
|
|
|
|
self.mav.message_hooks = list(filter(lambda x : x != hook, |
|
|
|
@ -2938,6 +2945,8 @@ class AutoTest(ABC):
@@ -2938,6 +2945,8 @@ class AutoTest(ABC):
|
|
|
|
|
self.set_parameter(name, |
|
|
|
|
old_value, |
|
|
|
|
add_to_context=False) |
|
|
|
|
for hook in dead.message_hooks: |
|
|
|
|
self.remove_message_hook(hook) |
|
|
|
|
|
|
|
|
|
class Context(object): |
|
|
|
|
def __init__(self, testsuite): |
|
|
|
@ -3841,18 +3850,32 @@ Also, ignores heartbeats not from our target system'''
@@ -3841,18 +3850,32 @@ Also, ignores heartbeats not from our target system'''
|
|
|
|
|
self.wait_statustext(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
def wait_statustext(self, text, timeout=20, the_function=None): |
|
|
|
|
"""Wait a specific STATUS_TEXT.""" |
|
|
|
|
"""Wait for a specific STATUSTEXT.""" |
|
|
|
|
|
|
|
|
|
# Statustexts are often triggered by something we've just |
|
|
|
|
# done, so we have to be careful not to read any traffic that |
|
|
|
|
# isn't checked for being our statustext. That doesn't work |
|
|
|
|
# well with getting the curent simulation time (which requires |
|
|
|
|
# a new SYSTEM_TIME message), so we install a message hook |
|
|
|
|
# which checks all incoming messages. |
|
|
|
|
self.progress("Waiting for text : %s" % text.lower()) |
|
|
|
|
global statustext_found |
|
|
|
|
statustext_found = False |
|
|
|
|
def mh(mav, m): |
|
|
|
|
global statustext_found |
|
|
|
|
if m.get_type() != "STATUSTEXT": |
|
|
|
|
return |
|
|
|
|
if text.lower() in m.text.lower(): |
|
|
|
|
self.progress("Received expected text : %s" % m.text.lower()) |
|
|
|
|
statustext_found = True |
|
|
|
|
self.install_message_hook_context(mh) |
|
|
|
|
tstart = self.get_sim_time() |
|
|
|
|
while self.get_sim_time_cached() < tstart + timeout: |
|
|
|
|
if statustext_found: |
|
|
|
|
return |
|
|
|
|
if the_function is not None: |
|
|
|
|
the_function() |
|
|
|
|
m = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1) |
|
|
|
|
if m is None: |
|
|
|
|
continue |
|
|
|
|
if text.lower() in m.text.lower(): |
|
|
|
|
self.progress("Received expected text : %s" % m.text.lower()) |
|
|
|
|
return True |
|
|
|
|
raise AutoTestTimeoutException("Failed to received text : %s" % |
|
|
|
|
text.lower()) |
|
|
|
|
|
|
|
|
|