You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
655 lines
22 KiB
655 lines
22 KiB
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import datetime |
|
import fnmatch |
|
import json |
|
import math |
|
import os |
|
import psutil # type: ignore |
|
import signal |
|
import subprocess |
|
import sys |
|
import time |
|
from logger_helper import color, colorize |
|
import process_helper as ph |
|
from typing import Any, Dict, List, NoReturn, TextIO, Optional |
|
from types import FrameType |
|
|
|
try: |
|
import requests |
|
except ImportError as e: |
|
print("Failed to import requests: " + str(e)) |
|
print("") |
|
print("You may need to install it using:") |
|
print(" pip3 install --user requests") |
|
print("") |
|
sys.exit(1) |
|
|
|
|
|
def main() -> NoReturn: |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--log-dir", |
|
help="Directory for log files", default="logs") |
|
parser.add_argument("--speed-factor", default=1, |
|
help="how fast to run the simulation") |
|
parser.add_argument("--iterations", type=int, default=1, |
|
help="how often to run all tests") |
|
parser.add_argument("--abort-early", action='store_true', |
|
help="abort on first unsuccessful test") |
|
parser.add_argument("--gui", default=False, action='store_true', |
|
help="display the visualization for a simulation") |
|
parser.add_argument("--model", type=str, default='all', |
|
help="only run tests for one model") |
|
parser.add_argument("--case", type=str, default='all', |
|
help="only run tests for one case " |
|
"(or multiple cases with wildcard '*')") |
|
parser.add_argument("--debugger", default="", |
|
help="choice from valgrind, callgrind, gdb, lldb") |
|
parser.add_argument("--upload", default=False, action='store_true', |
|
help="Upload logs to logs.px4.io") |
|
parser.add_argument("--verbose", default=False, action='store_true', |
|
help="enable more verbose output") |
|
parser.add_argument("config_file", help="JSON config file to use") |
|
parser.add_argument("--build-dir", type=str, |
|
default='build/px4_sitl_default/', |
|
help="relative path where the built files are stored") |
|
args = parser.parse_args() |
|
|
|
with open(args.config_file) as json_file: |
|
config = json.load(json_file) |
|
|
|
if config["mode"] != "sitl" and args.gui: |
|
print("--gui is not compatible with the mode '{}'" |
|
.format(config["mode"])) |
|
sys.exit(1) |
|
|
|
if not is_everything_ready(config, args.build_dir): |
|
sys.exit(1) |
|
|
|
if args.verbose: |
|
print("Creating directory: {}".format(args.log_dir)) |
|
os.makedirs(args.log_dir, exist_ok=True) |
|
|
|
tester = Tester( |
|
config, |
|
args.iterations, |
|
args.abort_early, |
|
args.speed_factor, |
|
args.model, |
|
args.case, |
|
args.debugger, |
|
args.log_dir, |
|
args.gui, |
|
args.verbose, |
|
args.upload, |
|
args.build_dir |
|
) |
|
signal.signal(signal.SIGINT, tester.sigint_handler) |
|
|
|
sys.exit(0 if tester.run() else 1) |
|
|
|
|
|
def is_running(process_name: str) -> bool: |
|
for proc in psutil.process_iter(attrs=['name']): |
|
if proc.info['name'] == process_name: |
|
return True |
|
return False |
|
|
|
|
|
def is_everything_ready(config: Dict[str, str], build_dir: str) -> bool: |
|
result = True |
|
|
|
if config['mode'] == 'sitl': |
|
if is_running('px4'): |
|
print("px4 process already running\n" |
|
"run `killall px4` and try again") |
|
result = False |
|
if not os.path.isfile(os.path.join(build_dir, 'bin/px4')): |
|
print("PX4 SITL is not built\n" |
|
"run `DONT_RUN=1 " |
|
"make px4_sitl gazebo mavsdk_tests` or " |
|
"`DONT_RUN=1 make px4_sitl_rtps gazebo mavsdk_tests`") |
|
result = False |
|
if config['simulator'] == 'gazebo': |
|
if is_running('gzserver'): |
|
print("gzserver process already running\n" |
|
"run `killall gzserver` and try again") |
|
result = False |
|
if is_running('gzclient'): |
|
print("gzclient process already running\n" |
|
"run `killall gzclient` and try again") |
|
result = False |
|
|
|
if not os.path.isfile(os.path.join(build_dir, |
|
'mavsdk_tests/mavsdk_tests')): |
|
print("Test runner is not built\n" |
|
"run `DONT_RUN=1 " |
|
"make px4_sitl gazebo mavsdk_tests` or " |
|
"`DONT_RUN=1 make px4_sitl_rtps gazebo mavsdk_tests`") |
|
result = False |
|
|
|
return result |
|
|
|
|
|
class Tester: |
|
def __init__(self, |
|
config: Dict[str, Any], |
|
iterations: int, |
|
abort_early: bool, |
|
speed_factor: float, |
|
model: str, |
|
case: str, |
|
debugger: str, |
|
log_dir: str, |
|
gui: bool, |
|
verbose: bool, |
|
upload: bool, |
|
build_dir: str): |
|
self.config = config |
|
self.build_dir = build_dir |
|
self.active_runners: List[ph.Runner] |
|
self.iterations = iterations |
|
self.abort_early = abort_early |
|
self.tests = self.determine_tests(config['tests'], model, case) |
|
self.speed_factor = speed_factor |
|
self.debugger = debugger |
|
self.log_dir = log_dir |
|
self.gui = gui |
|
self.verbose = verbose |
|
self.upload = upload |
|
self.start_time = datetime.datetime.now() |
|
self.log_fd: Any[TextIO] = None |
|
|
|
@staticmethod |
|
def wildcard_match(pattern: str, potential_match: str) -> bool: |
|
return fnmatch.fnmatchcase(potential_match, pattern) |
|
|
|
@staticmethod |
|
def plural_s(n_items: int) -> str: |
|
if n_items > 1: |
|
return "s" |
|
else: |
|
return "" |
|
|
|
@staticmethod |
|
def query_test_cases(build_dir: str, filter: str) -> List[str]: |
|
cmd = os.path.join(build_dir, 'mavsdk_tests/mavsdk_tests') |
|
args = ["--list-test-names-only", filter] |
|
p = subprocess.Popen( |
|
[cmd] + args, |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
assert p.stdout is not None |
|
cases = str(p.stdout.read().decode("utf-8")).strip().split('\n') |
|
return cases |
|
|
|
def determine_tests(self, |
|
tests: List[Dict[str, Any]], |
|
model: str, |
|
case: str) -> List[Dict[str, Any]]: |
|
for test in tests: |
|
test['selected'] = (model == 'all' or model == test['model']) |
|
test['cases'] = dict.fromkeys( |
|
self.query_test_cases(self.build_dir, test['test_filter'])) |
|
for key in test['cases'].keys(): |
|
test['cases'][key] = { |
|
'selected': (test['selected'] and |
|
(case == 'all' or |
|
self.wildcard_match(case, key)))} |
|
return tests |
|
|
|
def run(self) -> bool: |
|
self.show_plans() |
|
self.prepare_for_results() |
|
self.run_tests() |
|
self.show_detailed_results() |
|
self.show_overall_result() |
|
return self.was_overall_pass() |
|
|
|
def show_plans(self) -> None: |
|
print() |
|
print(colorize( |
|
"About to run {} test case{} for {} selected model{} " |
|
"({} iteration{}):". |
|
format(self.num_cases(), self.plural_s(self.num_cases()), |
|
self.num_models(), self.plural_s(self.num_models()), |
|
self.iterations, self.plural_s(self.iterations)), |
|
color.BOLD)) |
|
|
|
for test in self.tests: |
|
if not test['selected']: |
|
print(colorize(colorize( |
|
" - {} (not selected)".format(test['model']), |
|
color.BOLD), color.GRAY)) |
|
continue |
|
print(colorize(" - {}:".format(test['model']), color.BOLD)) |
|
for key, case in test['cases'].items(): |
|
if case['selected']: |
|
print(" - '{}'".format(key)) |
|
else: |
|
print(colorize(" - '{}' (not selected)".format(key), |
|
color.GRAY)) |
|
print() |
|
|
|
def num_models(self) -> int: |
|
return sum(map( |
|
lambda test: 1 if test['selected'] else 0, |
|
self.tests)) |
|
|
|
def num_cases(self) -> int: |
|
n_cases = 0 |
|
for test in self.tests: |
|
if not test['selected']: |
|
continue |
|
n_cases += self.num_cases_for_test(test) |
|
|
|
return n_cases |
|
|
|
def num_cases_for_test(self, test: Dict[str, Any]) -> int: |
|
n_cases = 0 |
|
for case in test['cases'].values(): |
|
if not case['selected']: |
|
continue |
|
n_cases += 1 |
|
return n_cases |
|
|
|
def prepare_for_results(self) -> None: |
|
for test in self.tests: |
|
if not test['selected']: |
|
continue |
|
for key, value in test['cases'].items(): |
|
if not value['selected']: |
|
continue |
|
value['results'] = [] |
|
|
|
def run_tests(self) -> None: |
|
for iteration in range(self.iterations): |
|
if self.iterations > 1: |
|
print(colorize("%%% Test iteration: {} / {}". |
|
format(iteration + 1, self.iterations), color.BOLD)) |
|
|
|
success = self.run_iteration(iteration) |
|
if not success: |
|
if self.abort_early: |
|
break |
|
|
|
def run_iteration(self, iteration: int) -> bool: |
|
iteration_success = True |
|
for test in self.tests: |
|
if not test['selected']: |
|
continue |
|
|
|
print(colorize( |
|
"==> Running tests for {}".format(test['model']), |
|
color.BOLD)) |
|
|
|
test_i = 0 |
|
for key, case_value in test['cases'].items(): |
|
if not case_value['selected']: |
|
continue |
|
|
|
print("--> Test case {} of {}: '{}' running ...". |
|
format(test_i+1, |
|
self.num_cases_for_test(test), |
|
key)) |
|
|
|
log_dir = self.get_log_dir(iteration, test['model'], key) |
|
if self.verbose: |
|
print("Creating log directory: {}" |
|
.format(log_dir)) |
|
os.makedirs(log_dir, exist_ok=True) |
|
|
|
was_success = self.run_test_case(test, key, log_dir) |
|
|
|
print("--- Test case {} of {}: '{}' {}." |
|
.format(test_i+1, |
|
self.num_cases_for_test(test), |
|
key, |
|
colorize("succeeded", color.GREEN) |
|
if was_success |
|
else colorize("failed", color.RED))) |
|
|
|
if not was_success: |
|
iteration_success = False |
|
|
|
if not was_success and self.abort_early: |
|
print("Aborting early") |
|
return False |
|
|
|
test_i += 1 |
|
|
|
return iteration_success |
|
|
|
def get_log_dir(self, iteration: int, model: str, case: str) -> str: |
|
date_and_time = self.start_time.strftime("%Y-%m-%dT%H-%M-%SZ") |
|
foldername = os.path.join(self.log_dir, date_and_time) |
|
|
|
if self.iterations > 1: |
|
# Use as many zeros in foldername as required. |
|
digits = math.floor(math.log10(self.iterations)) + 1 |
|
foldername = os.path.join(foldername, |
|
str(iteration + 1).zfill(digits)) |
|
|
|
foldername = os.path.join(foldername, model) |
|
foldername = os.path.join(foldername, case.replace(" ", "_")) |
|
|
|
return foldername |
|
|
|
def get_max_speed_factor(self, test: Dict[str, Any]) -> float: |
|
speed_factor = self.speed_factor |
|
if "max_speed_factor" in test: |
|
speed_factor = min(float(speed_factor), test["max_speed_factor"]) |
|
return speed_factor |
|
|
|
def run_test_case(self, test: Dict[str, Any], |
|
case: str, log_dir: str) -> bool: |
|
|
|
logfile_path = self.determine_logfile_path(log_dir, 'combined') |
|
self.start_combined_log(logfile_path) |
|
|
|
self.start_runners(log_dir, test, case) |
|
|
|
test_timeout_s = test['timeout_min']*60 |
|
while self.active_runners[-1].time_elapsed_s() < test_timeout_s: |
|
returncode = self.active_runners[-1].poll() |
|
|
|
self.collect_runner_output() |
|
|
|
if returncode is not None: |
|
is_success = (returncode == 0) |
|
break |
|
|
|
else: |
|
print(colorize( |
|
"Test timeout of {} mins triggered!". |
|
format(test['timeout_min']), |
|
color.BOLD)) |
|
is_success = False |
|
|
|
self.stop_runners() |
|
# Collect what was left in output buffers. |
|
self.collect_runner_output() |
|
self.stop_combined_log() |
|
|
|
result = {'success': is_success, |
|
'logfiles': [runner.get_log_filename() |
|
for runner in self.active_runners]} |
|
test['cases'][case]['results'].append(result) |
|
|
|
if not is_success: |
|
if not self.verbose: |
|
print(self.get_combined_log(logfile_path)) |
|
print("Logfiles: ") |
|
print(" - {}".format(logfile_path)) |
|
for runner in self.active_runners: |
|
print(" - {}".format(runner.get_log_filename())) |
|
|
|
if self.upload: |
|
ulog_file = self.parse_for_ulog_file( |
|
self.get_combined_log(logfile_path)) |
|
self.upload_log(ulog_file, test['model'], case, is_success) |
|
|
|
return is_success |
|
|
|
def start_runners(self, |
|
log_dir: str, |
|
test: Dict[str, Any], |
|
case: str) -> None: |
|
self.active_runners = [] |
|
|
|
if self.config['mode'] == 'sitl': |
|
px4_runner = ph.Px4Runner( |
|
os.getcwd(), |
|
log_dir, |
|
test['model'], |
|
case, |
|
self.get_max_speed_factor(test), |
|
self.debugger, |
|
self.verbose, |
|
self.build_dir) |
|
self.active_runners.append(px4_runner) |
|
|
|
if self.config['simulator'] == 'gazebo': |
|
gzserver_runner = ph.GzserverRunner( |
|
os.getcwd(), |
|
log_dir, |
|
test['vehicle'], |
|
case, |
|
self.get_max_speed_factor(test), |
|
self.verbose, |
|
self.build_dir) |
|
self.active_runners.append(gzserver_runner) |
|
|
|
gzmodelspawn_runner = ph.GzmodelspawnRunner( |
|
os.getcwd(), |
|
log_dir, |
|
test['vehicle'], |
|
case, |
|
self.verbose, |
|
self.build_dir) |
|
self.active_runners.append(gzmodelspawn_runner) |
|
|
|
if self.gui: |
|
gzclient_runner = ph.GzclientRunner( |
|
os.getcwd(), |
|
log_dir, |
|
test['model'], |
|
case, |
|
self.verbose) |
|
self.active_runners.append(gzclient_runner) |
|
|
|
mavsdk_tests_runner = ph.TestRunner( |
|
os.getcwd(), |
|
log_dir, |
|
test['model'], |
|
case, |
|
self.config['mavlink_connection'], |
|
self.speed_factor, |
|
self.verbose, |
|
self.build_dir) |
|
self.active_runners.append(mavsdk_tests_runner) |
|
|
|
abort = False |
|
for runner in self.active_runners: |
|
runner.set_log_filename( |
|
self.determine_logfile_path(log_dir, runner.name)) |
|
|
|
if not self.try_to_run_several_times(runner): |
|
abort = True |
|
break |
|
|
|
if abort: |
|
print("Could not start runner: {}".format(runner.name)) |
|
self.collect_runner_output() |
|
self.stop_combined_log() |
|
self.stop_runners() |
|
sys.exit(1) |
|
|
|
def try_to_run_several_times(self, runner: ph.Runner) -> bool: |
|
for _ in range(3): |
|
runner.start() |
|
|
|
if runner.has_started_ok(): |
|
return True |
|
|
|
else: |
|
runner.stop() |
|
time.sleep(1) |
|
|
|
return False |
|
|
|
def stop_runners(self) -> None: |
|
for runner in self.active_runners: |
|
runner.stop() |
|
|
|
def collect_runner_output(self) -> None: |
|
for runner in self.active_runners: |
|
while True: |
|
line = runner.get_output_line() |
|
if not line: |
|
break |
|
|
|
self.add_to_combined_log(line) |
|
if self.verbose: |
|
print(line, end="") |
|
|
|
def parse_for_ulog_file(self, log: str) -> Optional[str]: |
|
|
|
match = "[logger] Opened full log file: ./" |
|
for line in log.splitlines(): |
|
found = line.find(match) |
|
if found != -1: |
|
return os.path.join(os.getcwd(), self.build_dir, |
|
"tmp_mavsdk_tests/rootfs", |
|
line[found+len(match):]) |
|
return None |
|
|
|
def upload_log(self, ulog_path: Optional[str], |
|
model: str, case: str, success: bool) -> None: |
|
if not ulog_path: |
|
print(" Could not find ulog log file to upload") |
|
return |
|
|
|
if not os.getenv('GITHUB_RUN_ID'): |
|
print(" Upload only implemented for GitHub Actions CI") |
|
return |
|
|
|
print(" Uploading logfile '{}' ...".format(ulog_path)) |
|
|
|
server = "https://logs.px4.io" |
|
|
|
result_str = "passing" if success else "failing" |
|
|
|
payload = { |
|
"type": "flightreport", |
|
"description": "SITL integration test - {}: '{}' -> {}" |
|
.format(model, case, result_str), |
|
"feedback": |
|
"{}/{}/actions/runs/{}" |
|
.format(os.getenv("GITHUB_SERVER_URL"), |
|
os.getenv("GITHUB_REPOSITORY"), |
|
os.getenv("GITHUB_RUN_ID")), |
|
"email": "", |
|
"source": "CI", |
|
"videoUrl": "", |
|
"rating": "notset", |
|
"windSpeed": -1, |
|
"public": "true" |
|
} |
|
|
|
with open(ulog_path, 'rb') as f: |
|
r = requests.post(server + "/upload", |
|
data=payload, |
|
files={'filearg': f}, |
|
allow_redirects=False) |
|
if r.status_code == 302: # redirect |
|
if 'Location' in r.headers: |
|
plot_url = r.headers['Location'] |
|
if len(plot_url) > 0 and plot_url[0] == '/': |
|
plot_url = server + plot_url |
|
print(" Uploaded to: " + plot_url) |
|
else: |
|
print(" Upload failed with status_code: {}" |
|
.format(r.status_code)) |
|
|
|
def start_combined_log(self, filename: str) -> None: |
|
self.log_fd = open(filename, 'w') |
|
|
|
def stop_combined_log(self) -> None: |
|
if self.log_fd: |
|
self.log_fd.close() |
|
|
|
def add_to_combined_log(self, output: str) -> None: |
|
self.log_fd.write(output) |
|
|
|
def get_combined_log(self, filename: str) -> str: |
|
with open(filename, 'r') as f: |
|
return f.read() |
|
|
|
@staticmethod |
|
def determine_logfile_path(log_dir: str, desc: str) -> str: |
|
return os.path.join(log_dir, "log-{}.log".format(desc)) |
|
|
|
def show_detailed_results(self) -> None: |
|
print() |
|
print(colorize("Results:", color.BOLD)) |
|
|
|
for test in self.tests: |
|
if not test['selected']: |
|
print(colorize(colorize( |
|
" - {} (not selected)". |
|
format(test['model']), color.BOLD), color.GRAY)) |
|
continue |
|
else: |
|
print(colorize(" - {}:".format(test['model']), color.BOLD)) |
|
|
|
for name, case in test['cases'].items(): |
|
if not case['selected']: |
|
continue |
|
print(" - '{}': ".format(name), end="") |
|
|
|
n_succeeded = [result['success'] |
|
for result in case['results']].count(True) |
|
n_failed = [result['success'] |
|
for result in case['results']].count(False) |
|
n_cancelled = self.iterations - len(case['results']) |
|
|
|
notes = [ |
|
self.note_if_any( |
|
colorize("{}succeeded", color.GREEN), |
|
n_succeeded, self.iterations), |
|
self.note_if_any( |
|
colorize("{}failed", color.RED), |
|
n_failed, self.iterations), |
|
self.note_if_any( |
|
colorize("{}cancelled", color.GRAY), |
|
n_cancelled, self.iterations)] |
|
notes_without_none = list(filter(None, notes)) |
|
print(", ".join(notes_without_none)) |
|
|
|
def was_overall_pass(self) -> bool: |
|
for test in self.tests: |
|
if not test['selected']: |
|
continue |
|
|
|
for case in test['cases'].values(): |
|
if not case['selected']: |
|
continue |
|
for result in case['results']: |
|
if result['success'] is not True: |
|
return False |
|
return True |
|
|
|
def show_overall_result(self) -> None: |
|
print(colorize( |
|
"Overall result: {}".format( |
|
colorize("PASS", color.GREEN) |
|
if self.was_overall_pass() |
|
else colorize("FAIL", color.RED)), color.BOLD)) |
|
|
|
@staticmethod |
|
def note_if_any(text_to_format: str, n: int, total: int) -> Optional[str]: |
|
assert not n < 0 |
|
if n == 0: |
|
return None |
|
elif n == 1 and total == 1: |
|
return text_to_format.format("") |
|
else: |
|
return text_to_format.format(str(n) + " ") |
|
|
|
def sigint_handler(self, sig: int, frame: Optional[FrameType]) \ |
|
-> NoReturn: |
|
print("Received SIGINT") |
|
print("Stopping all processes ...") |
|
self.stop_runners() |
|
self.stop_combined_log() |
|
print("Stopping all processes done.") |
|
self.show_detailed_results() |
|
sys.exit(-sig) |
|
|
|
|
|
if __name__ == '__main__': |
|
main()
|
|
|