Browse Source

mavsdk_tests: print overall results

sbg
Julian Oes 5 years ago committed by Nuno Marques
parent
commit
8283d19682
  1. 1
      test/mavsdk_tests/logger_helper.py
  2. 274
      test/mavsdk_tests/mavsdk_test_runner.py
  3. 3
      test/mavsdk_tests/process_helper.py

1
test/mavsdk_tests/logger_helper.py

@ -15,6 +15,7 @@ class color(Enum): @@ -15,6 +15,7 @@ class color(Enum):
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
GRAY = '\033[90m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'

274
test/mavsdk_tests/mavsdk_test_runner.py

@ -9,9 +9,11 @@ import subprocess @@ -9,9 +9,11 @@ import subprocess
import sys
from logger_helper import color, colorize
import process_helper as ph
from typing import Any, Dict, List, NoReturn, Optional
from types import FrameType
def main():
def main() -> NoReturn:
parser = argparse.ArgumentParser()
parser.add_argument("--log-dir",
@ -61,30 +63,17 @@ def main(): @@ -61,30 +63,17 @@ def main():
)
signal.signal(signal.SIGINT, tester.sigint_handler)
sys.exit(tester.run())
sys.exit(0 if tester.run() else 1)
def determine_tests(filter):
cmd = os.getcwd() + "/build/px4_sitl_default/mavsdk_tests/mavsdk_tests"
args = ["--list-test-names-only", filter]
p = subprocess.Popen(
[cmd] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
assert p.stdout is not None
tests = str(p.stdout.read().decode("utf-8")).strip().split('\n')
return tests
def is_running(process_name):
def is_running(process_name: str) -> bool:
for proc in psutil.process_iter(attrs=['name']):
if proc.info['name'] == process_name:
return True
return False
def is_everything_ready(config):
def is_everything_ready(config: Dict[str, str]) -> bool:
result = True
if config['mode'] == 'sitl':
@ -118,115 +107,158 @@ def is_everything_ready(config): @@ -118,115 +107,158 @@ def is_everything_ready(config):
class Tester:
def __init__(self,
config,
iterations,
abort_early,
speed_factor,
model,
debugger,
log_dir,
gui,
verbose):
self.active_runners = []
config: Dict[str, Any],
iterations: int,
abort_early: bool,
speed_factor: float,
model: str,
debugger: str,
log_dir: str,
gui: bool,
verbose: bool):
self.config = config
self.active_runners: List[ph.Runner]
self.iterations = iterations
self.abort_early = abort_early
self.config = config
self.tests = self.determine_tests(config['tests'], model)
self.speed_factor = speed_factor
self.model = model
self.debugger = debugger
self.log_dir = log_dir
self.gui = gui
self.verbose = verbose
def run(self):
overall_success = True
@classmethod
def determine_tests(cls, tests: List[Dict[str, Any]], model: str) \
-> List[Dict[str, Any]]:
for test in tests:
test['selected'] = (model == 'all') or model == test['model']
test['cases'] = dict.fromkeys(
cls.determine_test_cases(test['test_filter']))
return tests
@staticmethod
def determine_test_cases(filter: str) -> List[str]:
cmd = os.getcwd() + "/build/px4_sitl_default/mavsdk_tests/mavsdk_tests"
args = ["--list-test-names-only", filter]
p = subprocess.Popen(
[cmd] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
assert p.stdout is not None
cases = str(p.stdout.read().decode("utf-8")).strip().split('\n')
return cases
@staticmethod
def plural_s(n_items: int) -> str:
if n_items > 1:
return "s"
else:
return ""
def run(self) -> bool:
self.show_plans()
self.prepare_for_results()
self.run_tests()
self.show_detailed_results()
self.show_overall_result()
return self.was_overall_pass()
def show_plans(self) -> None:
n_models = sum(map(
lambda test: 1 if test['selected'] else 0,
self.tests))
n_cases = sum(map(
lambda test: len(test['cases']) if test['selected'] else 0,
self.tests))
print()
print(colorize(
"About to run {} test case{} for {} selected model{} "
"({} iteration{}):".
format(n_cases, self.plural_s(n_cases),
n_models, self.plural_s(n_models),
self.iterations, self.plural_s(self.iterations)),
color.BOLD))
for test in self.tests:
if not test['selected']:
print(colorize(colorize(
" - {} (not selected)".format(test['model']),
color.BOLD), color.GRAY))
continue
print(colorize(" - {}:".format(test['model']), color.BOLD))
for case in test['cases']:
print(" - '{}'".format(case))
print()
def prepare_for_results(self) -> None:
# prepare dict entry for results
for test in self.tests:
if not test['selected']:
continue
for key in test['cases'].keys():
test['cases'][key] = {'results': []}
def run_tests(self) -> None:
for iteration in range(self.iterations):
if self.iterations > 1:
print("Test iteration: {} / {}".
format(iteration + 1, self.iterations))
was_success = self.run_test_group()
if not was_success:
overall_success = False
if self.iterations > 1 and not was_success and self.abort_early:
print("Aborting with a failure in test run {}/{}".
print("%%% Test iteration: {} / {}".
format(iteration + 1, self.iterations))
break
if overall_success:
print(colorize(colorize(
"Overall result: success!", color.GREEN), color.BOLD))
return 0
else:
print(colorize(colorize(
"Overall result: failure!", color.RED), color.BOLD))
return 1
success = self.run_iteration()
if not success:
if self.abort_early:
break
def run_test_group(self):
overall_success = True
def run_iteration(self) -> bool:
iteration_success = True
tests = self.config["tests"]
for test in self.tests:
if not test['selected']:
continue
if self.model == 'all':
models = tests
else:
found = False
for elem in tests:
if elem['model'] == self.model:
models = [elem]
found = True
if not found:
print("Specified model is not defined")
models = []
for group in models:
print(colorize(
"==> Running tests for '{}' with filter '{}'"
.format(group['model'], group['test_filter']),
"==> Running tests for {}".format(test['model']),
color.BOLD))
tests = determine_tests(group['test_filter'])
for i, test in enumerate(tests):
print("--> Test {} of {}: '{}' running ...".
format(i+1, len(tests), test))
was_success = self.run_test(test, group)
for i, case in enumerate(test['cases']):
print("--> Test case {} of {}: '{}' running ...".
format(i+1, len(test['cases']), case))
was_success = self.run_test_case(test, case)
if was_success:
print(colorize(
"--- Test {} of {}: '{}' succeeded."
.format(i+1, len(tests), test),
"--- Test case {} of {}: '{}' succeeded."
.format(i+1, len(test['cases']), case),
color.GREEN))
else:
print(colorize(
"--- Test {} of {}: '{}' failed."
.format(i+1, len(tests), test),
"--- Test case {} of {}: '{}' failed."
.format(i+1, len(test['cases']), case),
color.RED))
# TODO: now print the test output
if not was_success:
overall_success = False
iteration_success = False
if not was_success and self.abort_early:
print("Aborting early")
return False
return overall_success
return iteration_success
def run_test(self, test, group):
def run_test_case(self, test: Dict[str, Any], case: str) -> bool:
self.active_runners = []
speed_factor = self.speed_factor
if "max_speed_factor" in group:
speed_factor = min(int(speed_factor), group["max_speed_factor"])
if "max_speed_factor" in test:
speed_factor = min(float(speed_factor), test["max_speed_factor"])
if self.config['mode'] == 'sitl':
px4_runner = ph.Px4Runner(
os.getcwd(),
self.log_dir,
group,
test,
speed_factor,
self.debugger,
self.verbose)
@ -237,7 +269,7 @@ class Tester: @@ -237,7 +269,7 @@ class Tester:
gzserver_runner = ph.GzserverRunner(
os.getcwd(),
self.log_dir,
group,
test,
self.speed_factor,
self.verbose)
gzserver_runner.start()
@ -247,7 +279,7 @@ class Tester: @@ -247,7 +279,7 @@ class Tester:
gzclient_runner = ph.GzclientRunner(
os.getcwd(),
self.log_dir,
group,
test,
self.verbose)
gzclient_runner.start()
self.active_runners.append(gzclient_runner)
@ -255,15 +287,15 @@ class Tester: @@ -255,15 +287,15 @@ class Tester:
test_runner = ph.TestRunner(
os.getcwd(),
self.log_dir,
group,
test,
case,
self.config['mavlink_connection'],
self.verbose)
test_runner.start()
self.active_runners.append(test_runner)
while test_runner.time_elapsed_s() < group['timeout_min']*60:
while test_runner.time_elapsed_s() < test['timeout_min']*60:
returncode = test_runner.poll()
if returncode is not None:
is_success = (returncode == 0)
@ -276,22 +308,78 @@ class Tester: @@ -276,22 +308,78 @@ class Tester:
else:
print(colorize(
"Test timeout of {} mins triggered!".
format(group['timeout_min']),
format(test['timeout_min']),
color.BOLD))
is_success = False
for runner in self.active_runners:
runner.stop()
test['cases'][case]['results'].append(is_success)
return is_success
def sigint_handler(self, sig, frame):
def show_detailed_results(self) -> None:
print()
print(colorize("Results:", color.BOLD))
for test in self.tests:
if not test['selected']:
print(colorize(colorize(" - {} (not selected)".
format(test['model']), color.BOLD), color.GRAY))
continue
else:
print(colorize(" - {}:".format(test['model']), color.BOLD))
for name, case in test['cases'].items():
print(" - '{}': ".format(name), end="")
n_succeeded = case['results'].count(True)
n_failed = case['results'].count(False)
n_cancelled = self.iterations - len(case['results'])
notes = [
self.note_if_any(
"succeeded", n_succeeded, self.iterations),
self.note_if_any(
"failed", n_failed, self.iterations),
self.note_if_any(
"cancelled", n_cancelled, self.iterations)]
notes_without_none = list(filter(None, notes))
print(", ".join(notes_without_none))
def was_overall_pass(self) -> bool:
for test in self.tests:
if not test['selected']:
continue
for case in test['cases'].values():
if case['results'].count(False) > 0:
return False
return True
def show_overall_result(self) -> None:
print("Overall result: {}".format(
"PASS" if self.was_overall_pass() else "FAIL"))
@staticmethod
def note_if_any(text: str, n: int, total: int) -> Optional[str]:
assert not n < 0
if n == 0:
return None
elif n == 1 and total == 1:
return text
else:
return "{} {}".format(n, text)
def sigint_handler(self, sig: signal.Signals, frame: FrameType) \
-> NoReturn:
print("Received SIGINT")
print("Stopping all processes ...")
for runner in self.active_runners:
runner.stop()
print("Stopping all processes done.")
# TODO: print interim results
self.show_detailed_results()
sys.exit(-sig)

3
test/mavsdk_tests/process_helper.py

@ -49,9 +49,6 @@ class Runner: @@ -49,9 +49,6 @@ class Runner:
self.config['model'], self.config['test_filter'])
self.log_fd = open(self.log_filename, 'w')
print("self.cmd: {}".format(self.cmd))
print("self.args: {}".format(self.args))
print("self.cwd: {}".format(self.cwd))
self.process = subprocess.Popen(
[self.cmd] + self.args,
cwd=self.cwd,

Loading…
Cancel
Save