diff --git a/test/mavsdk_tests/logger_helper.py b/test/mavsdk_tests/logger_helper.py index a5766ef147..b63a0df0c0 100644 --- a/test/mavsdk_tests/logger_helper.py +++ b/test/mavsdk_tests/logger_helper.py @@ -15,6 +15,7 @@ class color(Enum): GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' + GRAY = '\033[90m' BOLD = '\033[1m' UNDERLINE = '\033[4m' RESET = '\033[0m' diff --git a/test/mavsdk_tests/mavsdk_test_runner.py b/test/mavsdk_tests/mavsdk_test_runner.py index f5810077da..b81e2e8569 100755 --- a/test/mavsdk_tests/mavsdk_test_runner.py +++ b/test/mavsdk_tests/mavsdk_test_runner.py @@ -9,9 +9,11 @@ import subprocess import sys from logger_helper import color, colorize import process_helper as ph +from typing import Any, Dict, List, NoReturn, Optional +from types import FrameType -def main(): +def main() -> NoReturn: parser = argparse.ArgumentParser() parser.add_argument("--log-dir", @@ -61,30 +63,17 @@ def main(): ) signal.signal(signal.SIGINT, tester.sigint_handler) - sys.exit(tester.run()) + sys.exit(0 if tester.run() else 1) -def determine_tests(filter): - cmd = os.getcwd() + "/build/px4_sitl_default/mavsdk_tests/mavsdk_tests" - args = ["--list-test-names-only", filter] - p = subprocess.Popen( - [cmd] + args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - assert p.stdout is not None - tests = str(p.stdout.read().decode("utf-8")).strip().split('\n') - return tests - - -def is_running(process_name): +def is_running(process_name: str) -> bool: for proc in psutil.process_iter(attrs=['name']): if proc.info['name'] == process_name: return True return False -def is_everything_ready(config): +def is_everything_ready(config: Dict[str, str]) -> bool: result = True if config['mode'] == 'sitl': @@ -118,115 +107,158 @@ def is_everything_ready(config): class Tester: def __init__(self, - config, - iterations, - abort_early, - speed_factor, - model, - debugger, - log_dir, - gui, - verbose): - self.active_runners = [] + config: Dict[str, Any], + iterations: int, + abort_early: bool, + speed_factor: float, + model: str, + debugger: str, + log_dir: str, + gui: bool, + verbose: bool): + self.config = config + self.active_runners: List[ph.Runner] self.iterations = iterations self.abort_early = abort_early - self.config = config + self.tests = self.determine_tests(config['tests'], model) self.speed_factor = speed_factor - self.model = model self.debugger = debugger self.log_dir = log_dir self.gui = gui self.verbose = verbose - def run(self): - overall_success = True - + @classmethod + def determine_tests(cls, tests: List[Dict[str, Any]], model: str) \ + -> List[Dict[str, Any]]: + for test in tests: + test['selected'] = (model == 'all') or model == test['model'] + test['cases'] = dict.fromkeys( + cls.determine_test_cases(test['test_filter'])) + + return tests + + @staticmethod + def determine_test_cases(filter: str) -> List[str]: + cmd = os.getcwd() + "/build/px4_sitl_default/mavsdk_tests/mavsdk_tests" + args = ["--list-test-names-only", filter] + p = subprocess.Popen( + [cmd] + args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + assert p.stdout is not None + cases = str(p.stdout.read().decode("utf-8")).strip().split('\n') + return cases + + @staticmethod + def plural_s(n_items: int) -> str: + if n_items > 1: + return "s" + else: + return "" + + def run(self) -> bool: + self.show_plans() + self.prepare_for_results() + self.run_tests() + self.show_detailed_results() + self.show_overall_result() + return self.was_overall_pass() + + def show_plans(self) -> None: + n_models = sum(map( + lambda test: 1 if test['selected'] else 0, + self.tests)) + n_cases = sum(map( + lambda test: len(test['cases']) if test['selected'] else 0, + self.tests)) + + print() + print(colorize( + "About to run {} test case{} for {} selected model{} " + "({} iteration{}):". + format(n_cases, self.plural_s(n_cases), + n_models, self.plural_s(n_models), + self.iterations, self.plural_s(self.iterations)), + color.BOLD)) + + for test in self.tests: + if not test['selected']: + print(colorize(colorize( + " - {} (not selected)".format(test['model']), + color.BOLD), color.GRAY)) + continue + print(colorize(" - {}:".format(test['model']), color.BOLD)) + for case in test['cases']: + print(" - '{}'".format(case)) + print() + + def prepare_for_results(self) -> None: + # prepare dict entry for results + for test in self.tests: + if not test['selected']: + continue + for key in test['cases'].keys(): + test['cases'][key] = {'results': []} + + def run_tests(self) -> None: for iteration in range(self.iterations): if self.iterations > 1: - print("Test iteration: {} / {}". - format(iteration + 1, self.iterations)) - - was_success = self.run_test_group() - - if not was_success: - overall_success = False - - if self.iterations > 1 and not was_success and self.abort_early: - print("Aborting with a failure in test run {}/{}". + print("%%% Test iteration: {} / {}". format(iteration + 1, self.iterations)) - break - if overall_success: - print(colorize(colorize( - "Overall result: success!", color.GREEN), color.BOLD)) - return 0 - else: - print(colorize(colorize( - "Overall result: failure!", color.RED), color.BOLD)) - return 1 + success = self.run_iteration() + if not success: + if self.abort_early: + break - def run_test_group(self): - overall_success = True + def run_iteration(self) -> bool: + iteration_success = True - tests = self.config["tests"] + for test in self.tests: + if not test['selected']: + continue - if self.model == 'all': - models = tests - else: - found = False - for elem in tests: - if elem['model'] == self.model: - models = [elem] - found = True - if not found: - print("Specified model is not defined") - models = [] - - for group in models: print(colorize( - "==> Running tests for '{}' with filter '{}'" - .format(group['model'], group['test_filter']), + "==> Running tests for {}".format(test['model']), color.BOLD)) - tests = determine_tests(group['test_filter']) - - for i, test in enumerate(tests): - print("--> Test {} of {}: '{}' running ...". - format(i+1, len(tests), test)) - was_success = self.run_test(test, group) + for i, case in enumerate(test['cases']): + print("--> Test case {} of {}: '{}' running ...". + format(i+1, len(test['cases']), case)) + was_success = self.run_test_case(test, case) if was_success: print(colorize( - "--- Test {} of {}: '{}' succeeded." - .format(i+1, len(tests), test), + "--- Test case {} of {}: '{}' succeeded." + .format(i+1, len(test['cases']), case), color.GREEN)) else: print(colorize( - "--- Test {} of {}: '{}' failed." - .format(i+1, len(tests), test), + "--- Test case {} of {}: '{}' failed." + .format(i+1, len(test['cases']), case), color.RED)) - # TODO: now print the test output if not was_success: - overall_success = False + iteration_success = False if not was_success and self.abort_early: print("Aborting early") return False - return overall_success + return iteration_success - def run_test(self, test, group): + def run_test_case(self, test: Dict[str, Any], case: str) -> bool: self.active_runners = [] speed_factor = self.speed_factor - if "max_speed_factor" in group: - speed_factor = min(int(speed_factor), group["max_speed_factor"]) + if "max_speed_factor" in test: + speed_factor = min(float(speed_factor), test["max_speed_factor"]) if self.config['mode'] == 'sitl': px4_runner = ph.Px4Runner( os.getcwd(), self.log_dir, - group, + test, speed_factor, self.debugger, self.verbose) @@ -237,7 +269,7 @@ class Tester: gzserver_runner = ph.GzserverRunner( os.getcwd(), self.log_dir, - group, + test, self.speed_factor, self.verbose) gzserver_runner.start() @@ -247,7 +279,7 @@ class Tester: gzclient_runner = ph.GzclientRunner( os.getcwd(), self.log_dir, - group, + test, self.verbose) gzclient_runner.start() self.active_runners.append(gzclient_runner) @@ -255,15 +287,15 @@ class Tester: test_runner = ph.TestRunner( os.getcwd(), self.log_dir, - group, test, + case, self.config['mavlink_connection'], self.verbose) test_runner.start() self.active_runners.append(test_runner) - while test_runner.time_elapsed_s() < group['timeout_min']*60: + while test_runner.time_elapsed_s() < test['timeout_min']*60: returncode = test_runner.poll() if returncode is not None: is_success = (returncode == 0) @@ -276,22 +308,78 @@ class Tester: else: print(colorize( "Test timeout of {} mins triggered!". - format(group['timeout_min']), + format(test['timeout_min']), color.BOLD)) is_success = False for runner in self.active_runners: runner.stop() + test['cases'][case]['results'].append(is_success) return is_success - def sigint_handler(self, sig, frame): + def show_detailed_results(self) -> None: + + print() + print(colorize("Results:", color.BOLD)) + + for test in self.tests: + if not test['selected']: + print(colorize(colorize(" - {} (not selected)". + format(test['model']), color.BOLD), color.GRAY)) + continue + else: + print(colorize(" - {}:".format(test['model']), color.BOLD)) + + for name, case in test['cases'].items(): + print(" - '{}': ".format(name), end="") + + n_succeeded = case['results'].count(True) + n_failed = case['results'].count(False) + n_cancelled = self.iterations - len(case['results']) + + notes = [ + self.note_if_any( + "succeeded", n_succeeded, self.iterations), + self.note_if_any( + "failed", n_failed, self.iterations), + self.note_if_any( + "cancelled", n_cancelled, self.iterations)] + notes_without_none = list(filter(None, notes)) + print(", ".join(notes_without_none)) + + def was_overall_pass(self) -> bool: + for test in self.tests: + if not test['selected']: + continue + + for case in test['cases'].values(): + if case['results'].count(False) > 0: + return False + return True + + def show_overall_result(self) -> None: + print("Overall result: {}".format( + "PASS" if self.was_overall_pass() else "FAIL")) + + @staticmethod + def note_if_any(text: str, n: int, total: int) -> Optional[str]: + assert not n < 0 + if n == 0: + return None + elif n == 1 and total == 1: + return text + else: + return "{} {}".format(n, text) + + def sigint_handler(self, sig: signal.Signals, frame: FrameType) \ + -> NoReturn: print("Received SIGINT") print("Stopping all processes ...") for runner in self.active_runners: runner.stop() print("Stopping all processes done.") - # TODO: print interim results + self.show_detailed_results() sys.exit(-sig) diff --git a/test/mavsdk_tests/process_helper.py b/test/mavsdk_tests/process_helper.py index d95f0021ab..ac128efd3f 100644 --- a/test/mavsdk_tests/process_helper.py +++ b/test/mavsdk_tests/process_helper.py @@ -49,9 +49,6 @@ class Runner: self.config['model'], self.config['test_filter']) self.log_fd = open(self.log_filename, 'w') - print("self.cmd: {}".format(self.cmd)) - print("self.args: {}".format(self.args)) - print("self.cwd: {}".format(self.cwd)) self.process = subprocess.Popen( [self.cmd] + self.args, cwd=self.cwd,