Browse Source

Tools: add autotest for checking GPS ordering

c415-sdk
bugobliterator 4 years ago committed by Randy Mackay
parent
commit
48277b43e4
  1. 31
      Tools/autotest/arducopter.py
  2. 25
      Tools/autotest/autotest.py
  3. 37
      Tools/autotest/common.py
  4. 46
      Tools/autotest/pysim/util.py

31
Tools/autotest/arducopter.py

@ -10,6 +10,7 @@ from __future__ import print_function @@ -10,6 +10,7 @@ from __future__ import print_function
import copy
import math
import os
import signal
import shutil
import time
import numpy
@ -2100,7 +2101,37 @@ class AutoTestCopter(AutoTest): @@ -2100,7 +2101,37 @@ class AutoTestCopter(AutoTest):
def fly_auto_test_using_can_gps(self):
self.set_parameter("CAN_P1_DRIVER", 1)
self.set_parameter("GPS_TYPE", 9)
self.set_parameter("GPS_TYPE2", 9)
self.set_parameter("SIM_GPS2_DISABLE", 0)
os.kill(self.sup_prog[0].pid, signal.SIGSTOP)
os.kill(self.sup_prog[1].pid, signal.SIGSTOP)
self.reboot_sitl()
# Test UAVCAN GPS ordering working
os.kill(self.sup_prog[0].pid, signal.SIGCONT)
gps1_det_text = self.wait_text("GPS 1: specified as UAVCAN")
os.kill(self.sup_prog[1].pid, signal.SIGCONT)
gps2_det_text = self.wait_text("GPS 2: specified as UAVCAN")
gps1_nodeid = int(gps1_det_text.split('-')[1])
gps2_nodeid = int(gps2_det_text.split('-')[1])
if gps1_nodeid is None or gps2_nodeid is None:
raise NotAchievedException("GPS not ordered per the order of Node IDs")
self.set_parameter("GPS1_CAN_OVRIDE", gps2_nodeid)
self.set_parameter("GPS2_CAN_OVRIDE", gps1_nodeid)
# Reboot the SITL, and shuffle the AP_Periph
os.kill(self.sup_prog[0].pid, signal.SIGSTOP)
os.kill(self.sup_prog[1].pid, signal.SIGSTOP)
self.drain_mav()
self.reboot_sitl()
try:
# order should have changed this time
os.kill(self.sup_prog[0].pid, signal.SIGCONT)
gps1_det_text = self.wait_text("GPS 2: specified as UAVCAN")
os.kill(self.sup_prog[1].pid, signal.SIGCONT)
gps2_det_text = self.wait_text("GPS 1: specified as UAVCAN")
except:
raise NotAchievedException("GPS not ordered as requested")
self.fly_auto_test()
def fly_motor_fail(self, fail_servo=0, fail_mul=0.0, holdtime=30):

25
Tools/autotest/autotest.py

@ -368,7 +368,7 @@ tester_class_map = { @@ -368,7 +368,7 @@ tester_class_map = {
}
suplementary_test_binary_map = {
"test.CAN": "sitl_periph_gps.AP_Periph",
"test.CAN": ["sitl_periph_gps.AP_Periph", "sitl_periph_gps.AP_Periph.1"],
}
@ -454,19 +454,24 @@ def run_step(step): @@ -454,19 +454,24 @@ def run_step(step):
if step.startswith("defaults"):
vehicle = step[9:]
return get_default_params(vehicle, binary)
supplementary_binaries = []
if step in suplementary_test_binary_map:
config_name = suplementary_test_binary_map[step].split('.')[0]
binary_name = suplementary_test_binary_map[step].split('.')[1]
supplementary_binary = util.reltopdir(os.path.join('build',
config_name,
'bin',
binary_name))
for supplementary_test_binary in suplementary_test_binary_map[step]:
config_name = supplementary_test_binary.split('.')[0]
binary_name = supplementary_test_binary.split('.')[1]
instance_num = 0
if len(supplementary_test_binary.split('.')) >= 3:
instance_num = int(supplementary_test_binary.split('.')[2])
supplementary_binaries.append([util.reltopdir(os.path.join('build',
config_name,
'bin',
binary_name)),
'-I {}'.format(instance_num)])
# we are running in conjunction with a supplementary app
# can't have speedup
opts.speedup = 1.0
else:
supplementary_binary = None
supplementary_binaries = []
fly_opts = {
"viewerip": opts.viewerip,
"use_map": opts.map,
@ -480,7 +485,7 @@ def run_step(step): @@ -480,7 +485,7 @@ def run_step(step):
"_show_test_timings": opts.show_test_timings,
"force_ahrs_type": opts.force_ahrs_type,
"logs_dir": buildlogs_dirpath(),
"sup_binary": supplementary_binary,
"sup_binaries": supplementary_binaries,
}
if opts.speedup is not None:
fly_opts["speedup"] = opts.speedup

37
Tools/autotest/common.py

@ -1210,7 +1210,7 @@ class AutoTest(ABC): @@ -1210,7 +1210,7 @@ class AutoTest(ABC):
_show_test_timings=False,
logs_dir=None,
force_ahrs_type=None,
sup_binary=None):
sup_binaries=[]):
self.start_time = time.time()
global __autotest__ # FIXME; make progress a non-staticmethod
@ -1229,7 +1229,7 @@ class AutoTest(ABC): @@ -1229,7 +1229,7 @@ class AutoTest(ABC):
self.breakpoints = breakpoints
self.disable_breakpoints = disable_breakpoints
self.speedup = speedup
self.sup_binary = sup_binary
self.sup_binaries = sup_binaries
self.mavproxy = None
self.mav = None
@ -5394,7 +5394,7 @@ Also, ignores heartbeats not from our target system''' @@ -5394,7 +5394,7 @@ Also, ignores heartbeats not from our target system'''
raise AutoTestTimeoutException("Failed to get EKF.flags=%u disabled" % not_required_value)
def wait_text(self, *args, **kwargs):
self.wait_statustext(*args, **kwargs)
return self.wait_statustext(*args, **kwargs)
def statustext_in_collections(self, text, regex=False):
c = self.context_get()
@ -5421,13 +5421,15 @@ Also, ignores heartbeats not from our target system''' @@ -5421,13 +5421,15 @@ Also, ignores heartbeats not from our target system'''
if check_context:
if self.statustext_in_collections(text, regex=regex):
self.progress("Found expected text in collection: %s" % text.lower())
return
return text
global statustext_found
global statustext_full
statustext_found = False
def mh(mav, m):
global statustext_found
global statustext_full
if m.get_type() != "STATUSTEXT":
return
if regex:
@ -5437,6 +5439,8 @@ Also, ignores heartbeats not from our target system''' @@ -5437,6 +5439,8 @@ Also, ignores heartbeats not from our target system'''
if text.lower() in m.text.lower():
self.progress("Received expected text: %s" % m.text.lower())
statustext_found = True
statustext_full = m.text
self.install_message_hook(mh)
if wallclock_timeout:
tstart = time.time()
@ -5456,6 +5460,8 @@ Also, ignores heartbeats not from our target system''' @@ -5456,6 +5460,8 @@ Also, ignores heartbeats not from our target system'''
self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1)
finally:
self.remove_message_hook(mh)
if statustext_found:
return statustext_full
def get_mavlink_connection_going(self):
# get a mavlink connection going
@ -5720,12 +5726,17 @@ Also, ignores heartbeats not from our target system''' @@ -5720,12 +5726,17 @@ Also, ignores heartbeats not from our target system'''
self.progress("Starting SITL", send_statustext=False)
self.sitl = util.start_SITL(self.binary, **start_sitl_args)
self.expect_list_add(self.sitl)
if self.sup_binary is not None:
self.progress("Starting Supplementary Program")
self.sup_prog = util.start_SITL(self.sup_binary, **start_sitl_args)
self.expect_list_add(self.sup_prog)
else:
self.sup_prog = None
self.sup_prog = []
for sup_binary in self.sup_binaries:
self.progress("Starting Supplementary Program ", sup_binary)
start_sitl_args["customisations"] = [sup_binary[1]]
start_sitl_args["supplementary"] = True
sup_prog_link = util.start_SITL(sup_binary[0], **start_sitl_args)
self.sup_prog.append(sup_prog_link)
self.expect_list_add(sup_prog_link)
def get_suplementary_programs(self):
return self.sup_prog
def sitl_is_running(self):
if self.sitl is None:
@ -5755,6 +5766,12 @@ Also, ignores heartbeats not from our target system''' @@ -5755,6 +5766,12 @@ Also, ignores heartbeats not from our target system'''
util.expect_setup_callback(self.mavproxy, self.expect_callback)
self.expect_list_clear()
if len(self.sup_prog):
self.expect_list_extend([self.sitl, self.mavproxy])
else:
self.expect_list_extend([self.sitl, self.mavproxy] + self.sup_prog)
# need to wait for a heartbeat to arrive as then mavutil will
# select the correct set of messages for us to receive in
# self.mav.messages. You can actually recieve messages with

46
Tools/autotest/pysim/util.py

@ -261,9 +261,10 @@ def start_SITL(binary, @@ -261,9 +261,10 @@ def start_SITL(binary,
breakpoints=[],
disable_breakpoints=False,
customisations=[],
lldb=False):
lldb=False,
supplementary=False):
if model is None:
if model is None and not supplementary:
raise ValueError("model must not be None")
"""Launch a SITL instance."""
@ -333,27 +334,28 @@ def start_SITL(binary, @@ -333,27 +334,28 @@ def start_SITL(binary,
raise RuntimeError("DISPLAY was not set")
cmd.append(binary)
if wipe:
cmd.append('-w')
if synthetic_clock:
cmd.append('-S')
if home is not None:
cmd.extend(['--home', home])
cmd.extend(['--model', model])
if speedup != 1:
cmd.extend(['--speedup', str(speedup)])
if defaults_filepath is not None:
if type(defaults_filepath) == list:
if len(defaults_filepath):
cmd.extend(['--defaults', ",".join(defaults_filepath)])
else:
cmd.extend(['--defaults', defaults_filepath])
if unhide_parameters:
cmd.extend(['--unhide-groups'])
cmd.extend(customisations)
if not supplementary:
if wipe:
cmd.append('-w')
if synthetic_clock:
cmd.append('-S')
if home is not None:
cmd.extend(['--home', home])
cmd.extend(['--model', model])
if speedup != 1:
cmd.extend(['--speedup', str(speedup)])
if defaults_filepath is not None:
if type(defaults_filepath) == list:
if len(defaults_filepath):
cmd.extend(['--defaults', ",".join(defaults_filepath)])
else:
cmd.extend(['--defaults', defaults_filepath])
if unhide_parameters:
cmd.extend(['--unhide-groups'])
# somewhere for MAVProxy to connect to:
cmd.append('--uartC=tcp:2')
# somewhere for MAVProxy to connect to:
cmd.append('--uartC=tcp:2')
cmd.extend(customisations)
if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'):
global windowID

Loading…
Cancel
Save