Browse Source
* refactor ekf analysis part 1: move plotting to functions * add plot_check_flags plot function * put plots in seperate file * use object-oriented programming for plotting * move functions for post processing and pdf report creation to new files * add in_air_detector and description as a csv file * refactor metrics and checks into separate functions * refactor metrics into seperate file, seperate plotting * ecl-ekf tools: re-structure folder and move results table generation * ecl-ekf-tool: fix imports and test_results_table * ecl-ekf tools: bugfix output observer tracking error plot * ecl-ekf-tools: update batch processing to new api, fix exception handling * ecl-ekf-tools: use correct in_air_detector * ecl-ekf-tools: rename csv file containing the bare test results table * ecl-tools: refactor for improving readability * ecl-ekf tools: small plotting bugfixes * ecl-ekf tools: small bugfixes in_air time, on_ground_trans, filenames * ecl-ekf-tools: fix amber metric bug * ecl-ekf-tools: remove custom function in inairdetector * ecl-ekf-tools: remove import of pandas * ecl-ekf-tools: add python interpreter to the script start * ecl-ekf-tools pdf_report: fix python interpreter line * px4-dev-ros-kinetic: update container tag to 2019-02-13 * ecl-ekf-tools python interpreter line: call python3 bin directly * ecl-ekf-tools: change airtime from namedtuple to class for python 3.5 * ecl-ekf-tools: update docker image px4-dev-ros-kinetic * ecl-ekf-tools: fix memory leak by correctly closing matplotlib figuressbg
JohannesBrand
6 years ago
committed by
GitHub
19 changed files with 1839 additions and 1508 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,143 @@
@@ -0,0 +1,143 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
function collection for performing ecl ekf checks |
||||
""" |
||||
|
||||
from typing import Tuple, List, Dict |
||||
|
||||
|
||||
def perform_ecl_ekf_checks( |
||||
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str], |
||||
check_levels: Dict[str, float]) -> Tuple[Dict[str, str], str]: |
||||
""" |
||||
# performs the imu, sensor, amd ekf checks and calculates a master status. |
||||
:param metrics: |
||||
:param sensor_checks: |
||||
:param innov_fail_checks: |
||||
:param check_levels: |
||||
:return: |
||||
""" |
||||
|
||||
imu_status = perform_imu_checks(metrics, check_levels) |
||||
|
||||
sensor_status = perform_sensor_innov_checks( |
||||
metrics, sensor_checks, innov_fail_checks, check_levels) |
||||
|
||||
ekf_status = dict() |
||||
ekf_status['filter_fault_status'] = 'Fail' if metrics['filter_faults_max'] > 0 else 'Pass' |
||||
|
||||
# combine the status from the checks |
||||
combined_status = dict() |
||||
combined_status.update(imu_status) |
||||
combined_status.update(sensor_status) |
||||
combined_status.update(ekf_status) |
||||
if any(val == 'Fail' for val in combined_status.values()): |
||||
master_status = 'Fail' |
||||
elif any(val == 'Warning' for val in combined_status.values()): |
||||
master_status = 'Warning' |
||||
else: |
||||
master_status = 'Pass' |
||||
|
||||
return combined_status, master_status |
||||
|
||||
|
||||
def perform_imu_checks( |
||||
imu_metrics: Dict[str, float], check_levels: Dict[str, float]) -> Dict[str, str]: |
||||
""" |
||||
performs the imu checks. |
||||
:param imu_metrics: |
||||
:param check_levels: |
||||
:return: |
||||
""" |
||||
|
||||
# check for IMU sensor warnings |
||||
imu_status = dict() |
||||
|
||||
# perform the vibration check |
||||
imu_status['imu_vibration_check'] = 'Pass' |
||||
for imu_vibr_metric in ['imu_coning', 'imu_hfdang', 'imu_hfdvel']: |
||||
mean_metric = '{:s}_mean'.format(imu_vibr_metric) |
||||
peak_metric = '{:s}_peak'.format(imu_vibr_metric) |
||||
if imu_metrics[mean_metric] > check_levels['{:s}_warn'.format(mean_metric)] \ |
||||
or imu_metrics[peak_metric] > check_levels['{:s}_warn'.format(peak_metric)]: |
||||
imu_status['imu_vibration_check'] = 'Warning' |
||||
|
||||
if imu_status['imu_vibration_check'] == 'Warning': |
||||
print('IMU vibration check warning.') |
||||
|
||||
# perform the imu bias check |
||||
if imu_metrics['imu_dang_bias_median'] > check_levels['imu_dang_bias_median_warn'] \ |
||||
or imu_metrics['imu_dvel_bias_median'] > check_levels['imu_dvel_bias_median_warn']: |
||||
imu_status['imu_bias_check'] = 'Warning' |
||||
print('IMU bias check warning.') |
||||
else: |
||||
imu_status['imu_bias_check'] = 'Pass' |
||||
|
||||
# perform output predictor |
||||
if imu_metrics['output_obs_ang_err_median'] > check_levels['obs_ang_err_median_warn'] \ |
||||
or imu_metrics['output_obs_vel_err_median'] > check_levels['obs_vel_err_median_warn'] \ |
||||
or imu_metrics['output_obs_pos_err_median'] > check_levels['obs_pos_err_median_warn']: |
||||
imu_status['imu_output_predictor_check'] = 'Warning' |
||||
print('IMU output predictor check warning.') |
||||
else: |
||||
imu_status['imu_output_predictor_check'] = 'Pass' |
||||
|
||||
imu_status['imu_sensor_status'] = 'Warning' if any( |
||||
val == 'Warning' for val in imu_status.values()) else 'Pass' |
||||
|
||||
return imu_status |
||||
|
||||
|
||||
def perform_sensor_innov_checks( |
||||
metrics: Dict[str, float], sensor_checks: List[str], innov_fail_checks: List[str], |
||||
check_levels: Dict[str, float]) -> Dict[str, str]: |
||||
""" |
||||
performs the sensor checks. |
||||
:param metrics: |
||||
:param sensor_checks: |
||||
:param innov_fail_checks: |
||||
:param check_levels: |
||||
:return: |
||||
""" |
||||
|
||||
sensor_status = dict() |
||||
|
||||
for result_id in ['hgt', 'mag', 'vel', 'pos', 'tas', 'hagl']: |
||||
|
||||
# only run sensor checks, if they apply. |
||||
if result_id in sensor_checks: |
||||
if metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[ |
||||
'{:s}_amber_fail_pct'.format(result_id)]: |
||||
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail' |
||||
print('{:s} sensor check failure.'.format(result_id)) |
||||
elif metrics['{:s}_percentage_amber'.format(result_id)] > check_levels[ |
||||
'{:s}_amber_warn_pct'.format(result_id)]: |
||||
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Warning' |
||||
print('{:s} sensor check warning.'.format(result_id)) |
||||
else: |
||||
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass' |
||||
|
||||
# perform innovation checks. |
||||
for signal_id, metric_name, result_id in [('posv', 'hgt_fail_percentage', 'hgt'), |
||||
('magx', 'magx_fail_percentage', 'mag'), |
||||
('magy', 'magy_fail_percentage', 'mag'), |
||||
('magz', 'magz_fail_percentage', 'mag'), |
||||
('yaw', 'yaw_fail_percentage', 'yaw'), |
||||
('vel', 'vel_fail_percentage', 'vel'), |
||||
('posh', 'pos_fail_percentage', 'pos'), |
||||
('tas', 'tas_fail_percentage', 'tas'), |
||||
('hagl', 'hagl_fail_percentage', 'hagl'), |
||||
('ofx', 'ofx_fail_percentage', 'flow'), |
||||
('ofy', 'ofy_fail_percentage', 'flow')]: |
||||
|
||||
# only run innov fail checks, if they apply. |
||||
if signal_id in innov_fail_checks: |
||||
|
||||
if metrics[metric_name] > check_levels['{:s}_fail_pct'.format(result_id)]: |
||||
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Fail' |
||||
print('{:s} sensor check failure.'.format(result_id)) |
||||
else: |
||||
if not ('{:s}_sensor_status'.format(result_id) in sensor_status): |
||||
sensor_status['{:s}_sensor_status'.format(result_id)] = 'Pass' |
||||
|
||||
return sensor_status |
@ -0,0 +1,182 @@
@@ -0,0 +1,182 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
detectors |
||||
""" |
||||
from typing import Optional |
||||
import numpy as np |
||||
from pyulog import ULog |
||||
|
||||
|
||||
class PreconditionError(Exception): |
||||
""" |
||||
a class for a Precondition Error |
||||
""" |
||||
|
||||
|
||||
class Airtime(object): |
||||
""" |
||||
Airtime struct. |
||||
""" |
||||
def __init__(self, take_off: float, landing: float): |
||||
self.take_off = take_off |
||||
self.landing = landing |
||||
|
||||
|
||||
class InAirDetector(object): |
||||
""" |
||||
handles airtime detection. |
||||
""" |
||||
|
||||
def __init__(self, ulog: ULog, min_flight_time_seconds: float = 0.0, |
||||
in_air_margin_seconds: float = 0.0) -> None: |
||||
""" |
||||
initializes an InAirDetector instance. |
||||
:param ulog: |
||||
:param min_flight_time_seconds: set this value to return only airtimes that are at least |
||||
min_flight_time_seconds long |
||||
:param in_air_margin_seconds: removes a margin of in_air_margin_seconds from the airtime |
||||
to avoid ground effects. |
||||
""" |
||||
|
||||
self._ulog = ulog |
||||
self._min_flight_time_seconds = min_flight_time_seconds |
||||
self._in_air_margin_seconds = in_air_margin_seconds |
||||
|
||||
try: |
||||
self._vehicle_land_detected = ulog.get_dataset('vehicle_land_detected').data |
||||
self._landed = self._vehicle_land_detected['landed'] |
||||
except: |
||||
self._in_air = [] |
||||
raise PreconditionError( |
||||
'InAirDetector: Could not find vehicle land detected message and/or landed field' |
||||
' and thus not find any airtime.') |
||||
|
||||
self._log_start = self._ulog.start_timestamp / 1.0e6 |
||||
|
||||
self._in_air = self._detect_airtime() |
||||
|
||||
|
||||
def _detect_airtime(self) -> Optional[Airtime]: |
||||
""" |
||||
detects the airtime take_off and landing of a ulog. |
||||
:return: a named tuple of ('Airtime', ['take_off', 'landing']) or None. |
||||
""" |
||||
|
||||
# test whether flight was in air at all |
||||
if (self._landed > 0).all(): |
||||
print('InAirDetector: always on ground.') |
||||
return [] |
||||
|
||||
# find the indices of all take offs and landings |
||||
take_offs = np.where(np.diff(self._landed) < 0)[0].tolist() |
||||
landings = np.where(np.diff(self._landed) > 0)[0].tolist() |
||||
|
||||
# check for start in air. |
||||
if len(take_offs) == 0 or ((len(landings) > 0) and (landings[0] < take_offs[0])): |
||||
|
||||
print('Started in air. Take first timestamp value as start point.') |
||||
take_offs = [-1] + take_offs |
||||
|
||||
# correct for offset: add 1 to take_off list |
||||
take_offs = [take_off + 1 for take_off in take_offs] |
||||
if len(landings) < len(take_offs): |
||||
print('No final landing detected. Assume last timestamp is landing.') |
||||
landings += [len(self._landed) - 2] |
||||
# correct for offset: add 1 to landing list |
||||
landings = [landing + 1 for landing in landings] |
||||
|
||||
assert len(landings) == len(take_offs), 'InAirDetector: different number of take offs' \ |
||||
' and landings.' |
||||
|
||||
in_air = [] |
||||
for take_off, landing in zip(take_offs, landings): |
||||
if (self._vehicle_land_detected['timestamp'][landing] / 1e6 - |
||||
self._in_air_margin_seconds) - \ |
||||
(self._vehicle_land_detected['timestamp'][take_off] / 1e6 + |
||||
self._in_air_margin_seconds) >= self._min_flight_time_seconds: |
||||
in_air.append(Airtime( |
||||
take_off=(self._vehicle_land_detected['timestamp'][take_off] - |
||||
self._ulog.start_timestamp) / 1.0e6 + self._in_air_margin_seconds, |
||||
landing=(self._vehicle_land_detected['timestamp'][landing] - |
||||
self._ulog.start_timestamp) / 1.0e6 - self._in_air_margin_seconds)) |
||||
if len(in_air) == 0: |
||||
print('InAirDetector: no airtime detected.') |
||||
|
||||
return in_air |
||||
|
||||
@property |
||||
def airtimes(self): |
||||
""" |
||||
airtimes |
||||
:return: |
||||
""" |
||||
return self._in_air |
||||
|
||||
@property |
||||
def take_off(self) -> Optional[float]: |
||||
""" |
||||
first take off |
||||
:return: |
||||
""" |
||||
return self._in_air[0].take_off if self._in_air else None |
||||
|
||||
@property |
||||
def landing(self) -> Optional[float]: |
||||
""" |
||||
last landing |
||||
:return: the last landing of the flight. |
||||
""" |
||||
return self._in_air[-1].landing if self._in_air else None |
||||
|
||||
@property |
||||
def log_start(self) -> Optional[float]: |
||||
""" |
||||
log start |
||||
:return: the start time of the log. |
||||
""" |
||||
return self._log_start |
||||
|
||||
def get_take_off_to_last_landing(self, dataset) -> list: |
||||
""" |
||||
return all indices of the log file between the first take_off and the |
||||
last landing. |
||||
:param dataset: |
||||
:return: |
||||
""" |
||||
try: |
||||
data = self._ulog.get_dataset(dataset).data |
||||
except: |
||||
print('InAirDetector: {:s} not found in log.'.format(dataset)) |
||||
return [] |
||||
|
||||
if self._in_air: |
||||
airtime = np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >= |
||||
self._in_air[0].take_off) & ( |
||||
(data['timestamp'] - self._ulog.start_timestamp) / |
||||
1.0e6 < self._in_air[-1].landing))[0] |
||||
|
||||
else: |
||||
airtime = [] |
||||
|
||||
return airtime |
||||
|
||||
def get_airtime(self, dataset) -> list: |
||||
""" |
||||
return all indices of the log file that are in air |
||||
:param dataset: |
||||
:return: |
||||
""" |
||||
try: |
||||
data = self._ulog.get_dataset(dataset).data |
||||
except: |
||||
raise PreconditionError('InAirDetector: {:s} not found in log.'.format(dataset)) |
||||
|
||||
airtime = [] |
||||
if self._in_air is not None: |
||||
for i in range(len(self._in_air)): |
||||
airtime.extend(np.where(((data['timestamp'] - self._ulog.start_timestamp) / 1.0e6 >= |
||||
self._in_air[i].take_off) & ( |
||||
(data['timestamp'] - self._ulog.start_timestamp) / |
||||
1.0e6 < self._in_air[i].landing))[0]) |
||||
|
||||
return airtime |
@ -0,0 +1,183 @@
@@ -0,0 +1,183 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
function collection for calculation ecl ekf metrics. |
||||
""" |
||||
|
||||
from typing import Dict, List, Tuple, Callable |
||||
|
||||
from pyulog import ULog |
||||
import numpy as np |
||||
|
||||
from analysis.detectors import InAirDetector |
||||
|
||||
def calculate_ecl_ekf_metrics( |
||||
ulog: ULog, innov_flags: Dict[str, float], innov_fail_checks: List[str], |
||||
sensor_checks: List[str], in_air: InAirDetector, in_air_no_ground_effects: InAirDetector, |
||||
red_thresh: float = 1.0, amb_thresh: float = 0.5) -> Tuple[dict, dict, dict, dict]: |
||||
|
||||
sensor_metrics = calculate_sensor_metrics( |
||||
ulog, sensor_checks, in_air, in_air_no_ground_effects, |
||||
red_thresh=red_thresh, amb_thresh=amb_thresh) |
||||
|
||||
innov_fail_metrics = calculate_innov_fail_metrics( |
||||
innov_flags, innov_fail_checks, in_air, in_air_no_ground_effects) |
||||
|
||||
imu_metrics = calculate_imu_metrics(ulog, in_air_no_ground_effects) |
||||
|
||||
estimator_status_data = ulog.get_dataset('estimator_status').data |
||||
|
||||
# Check for internal filter nummerical faults |
||||
ekf_metrics = {'filter_faults_max': np.amax(estimator_status_data['filter_fault_flags'])} |
||||
# TODO - process these bitmask's when they have been properly documented in the uORB topic |
||||
# estimator_status['health_flags'] |
||||
# estimator_status['timeout_flags'] |
||||
|
||||
# combine the metrics |
||||
combined_metrics = dict() |
||||
combined_metrics.update(imu_metrics) |
||||
combined_metrics.update(sensor_metrics) |
||||
combined_metrics.update(innov_fail_metrics) |
||||
combined_metrics.update(ekf_metrics) |
||||
|
||||
return combined_metrics |
||||
|
||||
|
||||
def calculate_sensor_metrics( |
||||
ulog: ULog, sensor_checks: List[str], in_air: InAirDetector, |
||||
in_air_no_ground_effects: InAirDetector, red_thresh: float = 1.0, |
||||
amb_thresh: float = 0.5) -> Dict[str, float]: |
||||
|
||||
estimator_status_data = ulog.get_dataset('estimator_status').data |
||||
|
||||
sensor_metrics = dict() |
||||
|
||||
# calculates peak, mean, percentage above 0.5 std, and percentage above std metrics for |
||||
# estimator status variables |
||||
for signal, result_id in [('hgt_test_ratio', 'hgt'), |
||||
('mag_test_ratio', 'mag'), |
||||
('vel_test_ratio', 'vel'), |
||||
('pos_test_ratio', 'pos'), |
||||
('tas_test_ratio', 'tas'), |
||||
('hagl_test_ratio', 'hagl')]: |
||||
|
||||
# only run sensor checks, if they apply. |
||||
if result_id in sensor_checks: |
||||
|
||||
if result_id == 'mag' or result_id == 'hgt': |
||||
in_air_detector = in_air_no_ground_effects |
||||
else: |
||||
in_air_detector = in_air |
||||
|
||||
# the percentage of samples above / below std dev |
||||
sensor_metrics['{:s}_percentage_red'.format(result_id)] = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_detector, |
||||
lambda x: 100.0 * np.mean(x > red_thresh)) |
||||
sensor_metrics['{:s}_percentage_amber'.format(result_id)] = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_detector, |
||||
lambda x: 100.0 * np.mean(x > amb_thresh)) - \ |
||||
sensor_metrics['{:s}_percentage_red'.format(result_id)] |
||||
|
||||
# the peak and mean ratio of samples above / below std dev |
||||
peak = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_detector, np.amax) |
||||
if peak > 0.0: |
||||
sensor_metrics['{:s}_test_max'.format(result_id)] = peak |
||||
sensor_metrics['{:s}_test_mean'.format(result_id)] = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, |
||||
in_air_detector, np.mean) |
||||
|
||||
return sensor_metrics |
||||
|
||||
|
||||
def calculate_innov_fail_metrics( |
||||
innov_flags: dict, innov_fail_checks: List[str], in_air: InAirDetector, |
||||
in_air_no_ground_effects: InAirDetector) -> dict: |
||||
""" |
||||
:param innov_flags: |
||||
:param innov_fail_checks: |
||||
:param in_air: |
||||
:param in_air_no_ground_effects: |
||||
:return: |
||||
""" |
||||
|
||||
innov_fail_metrics = dict() |
||||
|
||||
# calculate innovation check fail metrics |
||||
for signal_id, signal, result in [('posv', 'posv_innov_fail', 'hgt_fail_percentage'), |
||||
('magx', 'magx_innov_fail', 'magx_fail_percentage'), |
||||
('magy', 'magy_innov_fail', 'magy_fail_percentage'), |
||||
('magz', 'magz_innov_fail', 'magz_fail_percentage'), |
||||
('yaw', 'yaw_innov_fail', 'yaw_fail_percentage'), |
||||
('vel', 'vel_innov_fail', 'vel_fail_percentage'), |
||||
('posh', 'posh_innov_fail', 'pos_fail_percentage'), |
||||
('tas', 'tas_innov_fail', 'tas_fail_percentage'), |
||||
('hagl', 'hagl_innov_fail', 'hagl_fail_percentage'), |
||||
('ofx', 'ofx_innov_fail', 'ofx_fail_percentage'), |
||||
('ofy', 'ofy_innov_fail', 'ofy_fail_percentage')]: |
||||
|
||||
# only run innov fail checks, if they apply. |
||||
if signal_id in innov_fail_checks: |
||||
|
||||
if signal_id.startswith('mag') or signal_id == 'yaw' or signal_id == 'posv' or \ |
||||
signal_id.startswith('of'): |
||||
in_air_detector = in_air_no_ground_effects |
||||
else: |
||||
in_air_detector = in_air |
||||
|
||||
innov_fail_metrics[result] = calculate_stat_from_signal( |
||||
innov_flags, 'estimator_status', signal, in_air_detector, |
||||
lambda x: 100.0 * np.mean(x > 0.5)) |
||||
|
||||
return innov_fail_metrics |
||||
|
||||
|
||||
def calculate_imu_metrics( |
||||
ulog: ULog, in_air_no_ground_effects: InAirDetector) -> dict: |
||||
|
||||
ekf2_innovation_data = ulog.get_dataset('ekf2_innovations').data |
||||
|
||||
estimator_status_data = ulog.get_dataset('estimator_status').data |
||||
|
||||
imu_metrics = dict() |
||||
|
||||
# calculates the median of the output tracking error ekf innovations |
||||
for signal, result in [('output_tracking_error[0]', 'output_obs_ang_err_median'), |
||||
('output_tracking_error[1]', 'output_obs_vel_err_median'), |
||||
('output_tracking_error[2]', 'output_obs_pos_err_median')]: |
||||
imu_metrics[result] = calculate_stat_from_signal( |
||||
ekf2_innovation_data, 'ekf2_innovations', signal, in_air_no_ground_effects, np.median) |
||||
|
||||
# calculates peak and mean for IMU vibration checks |
||||
for signal, result in [('vibe[0]', 'imu_coning'), |
||||
('vibe[1]', 'imu_hfdang'), |
||||
('vibe[2]', 'imu_hfdvel')]: |
||||
peak = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_no_ground_effects, np.amax) |
||||
if peak > 0.0: |
||||
imu_metrics['{:s}_peak'.format(result)] = peak |
||||
imu_metrics['{:s}_mean'.format(result)] = calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, |
||||
in_air_no_ground_effects, np.mean) |
||||
|
||||
# IMU bias checks |
||||
imu_metrics['imu_dang_bias_median'] = np.sqrt(np.sum([np.square(calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_no_ground_effects, np.median)) |
||||
for signal in ['states[10]', 'states[11]', 'states[12]']])) |
||||
imu_metrics['imu_dvel_bias_median'] = np.sqrt(np.sum([np.square(calculate_stat_from_signal( |
||||
estimator_status_data, 'estimator_status', signal, in_air_no_ground_effects, np.median)) |
||||
for signal in ['states[13]', 'states[14]', 'states[15]']])) |
||||
|
||||
return imu_metrics |
||||
|
||||
|
||||
def calculate_stat_from_signal( |
||||
data: Dict[str, np.ndarray], dataset: str, variable: str, |
||||
in_air_det: InAirDetector, stat_function: Callable) -> float: |
||||
""" |
||||
:param data: |
||||
:param variable: |
||||
:param in_air_detector: |
||||
:return: |
||||
""" |
||||
|
||||
return stat_function(data[variable][in_air_det.get_airtime(dataset)]) |
@ -0,0 +1,142 @@
@@ -0,0 +1,142 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
function collection for post-processing of ulog data. |
||||
""" |
||||
|
||||
from typing import Tuple |
||||
|
||||
import numpy as np |
||||
|
||||
|
||||
def get_estimator_check_flags(estimator_status: dict) -> Tuple[dict, dict, dict]: |
||||
""" |
||||
:param estimator_status: |
||||
:return: |
||||
""" |
||||
control_mode = get_control_mode_flags(estimator_status) |
||||
innov_flags = get_innovation_check_flags(estimator_status) |
||||
gps_fail_flags = get_gps_check_fail_flags(estimator_status) |
||||
return control_mode, innov_flags, gps_fail_flags |
||||
|
||||
|
||||
def get_control_mode_flags(estimator_status: dict) -> dict: |
||||
""" |
||||
:param estimator_status: |
||||
:return: |
||||
""" |
||||
|
||||
control_mode = dict() |
||||
# extract control mode metadata from estimator_status.control_mode_flags |
||||
# 0 - true if the filter tilt alignment is complete |
||||
# 1 - true if the filter yaw alignment is complete |
||||
# 2 - true if GPS measurements are being fused |
||||
# 3 - true if optical flow measurements are being fused |
||||
# 4 - true if a simple magnetic yaw heading is being fused |
||||
# 5 - true if 3-axis magnetometer measurement are being fused |
||||
# 6 - true if synthetic magnetic declination measurements are being fused |
||||
# 7 - true when the vehicle is airborne |
||||
# 8 - true when wind velocity is being estimated |
||||
# 9 - true when baro height is being fused as a primary height reference |
||||
# 10 - true when range finder height is being fused as a primary height reference |
||||
# 11 - true when range finder height is being fused as a primary height reference |
||||
# 12 - true when local position data from external vision is being fused |
||||
# 13 - true when yaw data from external vision measurements is being fused |
||||
# 14 - true when height data from external vision measurements is being fused |
||||
control_mode['tilt_aligned'] = ((2 ** 0 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['yaw_aligned'] = ((2 ** 1 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_gps'] = ((2 ** 2 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_optflow'] = ((2 ** 3 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_magyaw'] = ((2 ** 4 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_mag3d'] = ((2 ** 5 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_magdecl'] = ((2 ** 6 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['airborne'] = ((2 ** 7 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['estimating_wind'] = ((2 ** 8 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_barohgt'] = ((2 ** 9 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_rnghgt'] = ((2 ** 10 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_gpshgt'] = ((2 ** 11 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_evpos'] = ((2 ** 12 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_evyaw'] = ((2 ** 13 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
control_mode['using_evhgt'] = ((2 ** 14 & estimator_status['control_mode_flags']) > 0) * 1 |
||||
return control_mode |
||||
|
||||
|
||||
def get_innovation_check_flags(estimator_status: dict) -> dict: |
||||
""" |
||||
:param estimator_status: |
||||
:return: |
||||
""" |
||||
|
||||
innov_flags = dict() |
||||
# innovation_check_flags summary |
||||
# 0 - true if velocity observations have been rejected |
||||
# 1 - true if horizontal position observations have been rejected |
||||
# 2 - true if true if vertical position observations have been rejected |
||||
# 3 - true if the X magnetometer observation has been rejected |
||||
# 4 - true if the Y magnetometer observation has been rejected |
||||
# 5 - true if the Z magnetometer observation has been rejected |
||||
# 6 - true if the yaw observation has been rejected |
||||
# 7 - true if the airspeed observation has been rejected |
||||
# 8 - true if synthetic sideslip observation has been rejected |
||||
# 9 - true if the height above ground observation has been rejected |
||||
# 10 - true if the X optical flow observation has been rejected |
||||
# 11 - true if the Y optical flow observation has been rejected |
||||
innov_flags['vel_innov_fail'] = ((2 ** 0 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['posh_innov_fail'] = ((2 ** 1 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['posv_innov_fail'] = ((2 ** 2 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['magx_innov_fail'] = ((2 ** 3 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['magy_innov_fail'] = ((2 ** 4 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['magz_innov_fail'] = ((2 ** 5 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['yaw_innov_fail'] = ((2 ** 6 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['tas_innov_fail'] = ((2 ** 7 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['sli_innov_fail'] = ((2 ** 8 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['hagl_innov_fail'] = ((2 ** 9 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['ofx_innov_fail'] = ((2 ** 10 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
innov_flags['ofy_innov_fail'] = ((2 ** 11 & estimator_status['innovation_check_flags']) > 0) * 1 |
||||
return innov_flags |
||||
|
||||
|
||||
def get_gps_check_fail_flags(estimator_status: dict) -> dict: |
||||
""" |
||||
:param estimator_status: |
||||
:return: |
||||
""" |
||||
gps_fail_flags = dict() |
||||
|
||||
# 0 : insufficient fix type (no 3D solution) |
||||
# 1 : minimum required sat count fail |
||||
# 2 : minimum required GDoP fail |
||||
# 3 : maximum allowed horizontal position error fail |
||||
# 4 : maximum allowed vertical position error fail |
||||
# 5 : maximum allowed speed error fail |
||||
# 6 : maximum allowed horizontal position drift fail |
||||
# 7 : maximum allowed vertical position drift fail |
||||
# 8 : maximum allowed horizontal speed fail |
||||
# 9 : maximum allowed vertical velocity discrepancy fail |
||||
gps_fail_flags['gfix_fail'] = ((2 ** 0 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['nsat_fail'] = ((2 ** 1 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['gdop_fail'] = ((2 ** 2 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['herr_fail'] = ((2 ** 3 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['verr_fail'] = ((2 ** 4 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['serr_fail'] = ((2 ** 5 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['hdrift_fail'] = ((2 ** 6 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['vdrift_fail'] = ((2 ** 7 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['hspd_fail'] = ((2 ** 8 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
gps_fail_flags['veld_diff_fail'] = ((2 ** 9 & estimator_status['gps_check_fail_flags']) > 0) * 1 |
||||
return gps_fail_flags |
||||
|
||||
|
||||
def magnetic_field_estimates_from_status(estimator_status: dict) -> Tuple[float, float, float]: |
||||
""" |
||||
|
||||
:param estimator_status: |
||||
:return: |
||||
""" |
||||
rad2deg = 57.2958 |
||||
field_strength = np.sqrt( |
||||
estimator_status['states[16]'] ** 2 + estimator_status['states[17]'] ** 2 + |
||||
estimator_status['states[18]'] ** 2) |
||||
declination = rad2deg * np.arctan2(estimator_status['states[17]'], |
||||
estimator_status['states[16]']) |
||||
inclination = rad2deg * np.arcsin( |
||||
estimator_status['states[18]'] / np.maximum(field_strength, np.finfo(np.float32).eps)) |
||||
return declination, field_strength, inclination |
@ -1,47 +1,87 @@
@@ -1,47 +1,87 @@
|
||||
#!/usr/bin/env python |
||||
# -*- coding: utf-8 -*- |
||||
import argparse |
||||
import os, glob |
||||
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
Runs process_logdata_ekf.py on the .ulg files in the supplied directory. ulog files are skipped from the analysis, if a |
||||
corresponding .pdf file already exists (unless the overwrite flag was set). |
||||
""" |
||||
# -*- coding: utf-8 -*- |
||||
|
||||
import argparse |
||||
import os, glob |
||||
|
||||
from process_logdata_ekf import process_logdata_ekf |
||||
|
||||
parser = argparse.ArgumentParser(description='Analyse the estimator_status and ekf2_innovation message data for the' |
||||
def get_arguments(): |
||||
parser = argparse.ArgumentParser(description='Analyse the estimator_status and ekf2_innovation message data for the' |
||||
' .ulg files in the specified directory') |
||||
parser.add_argument("directory_path") |
||||
parser.add_argument('-o', '--overwrite', action='store_true', |
||||
parser.add_argument("directory_path") |
||||
parser.add_argument('-o', '--overwrite', action='store_true', |
||||
help='Whether to overwrite an already analysed file. If a file with .pdf extension exists for a .ulg' |
||||
'file, the log file will be skipped from analysis unless this flag has been set.') |
||||
parser.add_argument('--no-sensor-safety-margin', action='store_true', |
||||
parser.add_argument('--no-plots', action='store_true', |
||||
help='Whether to only analyse and not plot the summaries for developers.') |
||||
parser.add_argument('--check-level-thresholds', type=str, default=None, |
||||
help='The csv file of fail and warning test thresholds for analysis.') |
||||
parser.add_argument('--check-table', type=str, default=None, |
||||
help='The csv file with descriptions of the checks.') |
||||
parser.add_argument('--no-sensor-safety-margin', action='store_true', |
||||
help='Whether to not cut-off 5s after take-off and 5s before landing ' |
||||
'(for certain sensors that might be influence by proximity to ground).') |
||||
return parser.parse_args() |
||||
|
||||
|
||||
def main() -> None: |
||||
|
||||
args = get_arguments() |
||||
|
||||
def is_valid_directory(parser, arg): |
||||
if os.path.isdir(arg): |
||||
# Directory exists so return the directory |
||||
return arg |
||||
if args.check_level_thresholds is not None: |
||||
check_level_dict_filename = args.check_level_thresholds |
||||
else: |
||||
parser.error('The directory {} does not exist'.format(arg)) |
||||
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) |
||||
check_level_dict_filename = os.path.join(file_dir, "check_level_dict.csv") |
||||
|
||||
args = parser.parse_args() |
||||
ulog_directory = args.directory_path |
||||
print("\n"+"analysing the .ulg files in "+ulog_directory) |
||||
if args.check_table is not None: |
||||
check_table_filename = args.check_table |
||||
else: |
||||
file_dir = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) |
||||
check_table_filename = os.path.join(file_dir, "check_table.csv") |
||||
|
||||
ulog_directory = args.directory_path |
||||
|
||||
# get all the ulog files found in the specified directory and in subdirectories |
||||
ulog_files = glob.glob(os.path.join(ulog_directory, '**/*.ulg'), recursive=True) |
||||
# get all the ulog files found in the specified directory and in subdirectories |
||||
ulog_files = glob.glob(os.path.join(ulog_directory, '**/*.ulg'), recursive=True) |
||||
print("found {:d} .ulg files in {:s}".format(len(ulog_files), ulog_directory)) |
||||
|
||||
# remove the files already analysed unless the overwrite flag was specified. A ulog file is consired to be analysed if |
||||
# a corresponding .pdf file exists.' |
||||
if not args.overwrite: |
||||
# remove the files already analysed unless the overwrite flag was specified. A ulog file is consired to be analysed if |
||||
# a corresponding .pdf file exists.' |
||||
if not args.overwrite: |
||||
print("skipping already analysed ulg files.") |
||||
ulog_files = [ulog_file for ulog_file in ulog_files if not os.path.exists('{}.pdf'.format(ulog_file))] |
||||
ulog_files = [ulog_file for ulog_file in ulog_files if |
||||
not os.path.exists('{}.pdf'.format(ulog_file))] |
||||
|
||||
# analyse all ulog files |
||||
for ulog_file in ulog_files: |
||||
print("\n"+"loading " + ulog_file + " for analysis") |
||||
if args.no_sensor_safety_margin: |
||||
os.system("python process_logdata_ekf.py {} --no-sensor-safety-margin".format(ulog_file)) |
||||
else: |
||||
os.system("python process_logdata_ekf.py {}".format(ulog_file)) |
||||
n_files = len(ulog_files) |
||||
|
||||
print("analysing the {:d} .ulg files".format(n_files)) |
||||
|
||||
i = 1 |
||||
n_skipped = 0 |
||||
# analyse all ulog files |
||||
for ulog_file in ulog_files: |
||||
print('analysing file {:d}/{:d}: {:s}'.format(i, n_files, ulog_file)) |
||||
|
||||
try: |
||||
_ = process_logdata_ekf( |
||||
ulog_file, check_level_dict_filename, check_table_filename, |
||||
plot=not args.no_plots, sensor_safety_margins=not args.no_sensor_safety_margin) |
||||
|
||||
except Exception as e: |
||||
print(str(e)) |
||||
print('an exception occurred, skipping file {:s}'.format(ulog_file)) |
||||
n_skipped = n_skipped + 1 |
||||
|
||||
i = i + 1 |
||||
|
||||
print('{:d}/{:d} files analysed, {:d} skipped.'.format(n_files-n_skipped, n_files, n_skipped)) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
main() |
|
|
@ -0,0 +1,408 @@
@@ -0,0 +1,408 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
function collection for plotting |
||||
""" |
||||
|
||||
from typing import Optional, List, Tuple, Dict |
||||
|
||||
import numpy as np |
||||
import matplotlib.pyplot as plt |
||||
from matplotlib.pyplot import Figure, Axes |
||||
from matplotlib.backends.backend_pdf import PdfPages |
||||
|
||||
|
||||
def get_min_arg_time_value( |
||||
time_series_data: np.ndarray, data_time: np.ndarray) -> Tuple[int, float, float]: |
||||
""" |
||||
:param time_series_data: |
||||
:param data_time: |
||||
:return: |
||||
""" |
||||
min_arg = np.argmin(time_series_data) |
||||
min_time = data_time[min_arg] |
||||
min_value = np.amin(time_series_data) |
||||
return (min_arg, min_value, min_time) |
||||
|
||||
|
||||
def get_max_arg_time_value( |
||||
time_series_data: np.ndarray, data_time: np.ndarray) -> Tuple[int, float, float]: |
||||
""" |
||||
:param time_series_data: |
||||
:param data_time: |
||||
:return: |
||||
""" |
||||
max_arg = np.argmax(time_series_data) |
||||
max_time = data_time[max_arg] |
||||
max_value = np.amax(time_series_data) |
||||
return max_arg, max_value, max_time |
||||
|
||||
|
||||
class DataPlot(): |
||||
""" |
||||
A plotting class interface. Provides functions such as saving the figure. |
||||
""" |
||||
def __init__( |
||||
self, plot_data: Dict[str, np.ndarray], variable_names: List[List[str]], |
||||
plot_title: str = '', sub_titles: Optional[List[str]] = None, |
||||
x_labels: Optional[List[str]] = None, y_labels: Optional[List[str]] = None, |
||||
y_lim: Optional[Tuple[int, int]] = None, legend: Optional[List[str]] = None, |
||||
pdf_handle: Optional[PdfPages] = None) -> None: |
||||
""" |
||||
Initializes the data plot class interface. |
||||
:param plot_title: |
||||
:param pdf_handle: |
||||
""" |
||||
self._plot_data = plot_data |
||||
self._variable_names = variable_names |
||||
self._plot_title = plot_title |
||||
self._sub_titles = sub_titles |
||||
self._x_labels = x_labels |
||||
self._y_labels = y_labels |
||||
self._y_lim = y_lim |
||||
self._legend = legend |
||||
self._pdf_handle = pdf_handle |
||||
self._fig = None |
||||
self._ax = None |
||||
self._fig_size = (20, 13) |
||||
|
||||
@property |
||||
def fig(self) -> Figure: |
||||
""" |
||||
:return: the figure handle |
||||
""" |
||||
if self._fig is None: |
||||
self._create_figure() |
||||
return self._fig |
||||
|
||||
@property |
||||
def ax(self) -> Axes: |
||||
""" |
||||
:return: the axes handle |
||||
""" |
||||
if self._ax is None: |
||||
self._create_figure() |
||||
return self._ax |
||||
|
||||
@property |
||||
def plot_data(self) -> dict: |
||||
""" |
||||
returns the plot data. calls _generate_plot_data if necessary. |
||||
:return: |
||||
""" |
||||
if self._plot_data is None: |
||||
self._generate_plot_data() |
||||
return self._plot_data |
||||
|
||||
def plot(self) -> None: |
||||
""" |
||||
placeholder for the plotting function. A child class should implement this function. |
||||
:return: |
||||
""" |
||||
|
||||
def _create_figure(self) -> None: |
||||
""" |
||||
creates the figure handle. |
||||
:return: |
||||
""" |
||||
self._fig, self._ax = plt.subplots(frameon=True, figsize=self._fig_size) |
||||
self._fig.suptitle(self._plot_title) |
||||
|
||||
|
||||
def _generate_plot_data(self) -> None: |
||||
""" |
||||
placeholder for a function that generates a data table necessary for plotting |
||||
:return: |
||||
""" |
||||
|
||||
def show(self) -> None: |
||||
""" |
||||
displays the figure on the screen. |
||||
:return: None |
||||
""" |
||||
self.fig.show() |
||||
|
||||
|
||||
def save(self) -> None: |
||||
""" |
||||
saves the figure if a pdf_handle was initialized. |
||||
:return: |
||||
""" |
||||
|
||||
if self._pdf_handle is not None and self.fig is not None: |
||||
self.plot() |
||||
self._pdf_handle.savefig(figure=self.fig) |
||||
else: |
||||
print('skipping saving to pdf: handle was not initialized.') |
||||
|
||||
|
||||
def close(self) -> None: |
||||
""" |
||||
closes the figure. |
||||
:return: |
||||
""" |
||||
plt.close(self._fig) |
||||
|
||||
|
||||
class TimeSeriesPlot(DataPlot): |
||||
""" |
||||
class for creating multiple time series plot. |
||||
""" |
||||
def __init__( |
||||
self, plot_data: dict, variable_names: List[List[str]], x_labels: List[str], |
||||
y_labels: List[str], plot_title: str = '', sub_titles: Optional[List[str]] = None, |
||||
pdf_handle: Optional[PdfPages] = None) -> None: |
||||
""" |
||||
initializes a timeseries plot |
||||
:param plot_data: |
||||
:param variable_names: |
||||
:param xlabels: |
||||
:param ylabels: |
||||
:param plot_title: |
||||
:param pdf_handle: |
||||
""" |
||||
super().__init__( |
||||
plot_data, variable_names, plot_title=plot_title, sub_titles=sub_titles, |
||||
x_labels=x_labels, y_labels=y_labels, pdf_handle=pdf_handle) |
||||
|
||||
def plot(self): |
||||
""" |
||||
plots the time series data. |
||||
:return: |
||||
""" |
||||
if self.fig is None: |
||||
return |
||||
|
||||
for i in range(len(self._variable_names)): |
||||
plt.subplot(len(self._variable_names), 1, i + 1) |
||||
for v in self._variable_names[i]: |
||||
plt.plot(self.plot_data[v], 'b') |
||||
plt.xlabel(self._x_labels[i]) |
||||
plt.ylabel(self._y_labels[i]) |
||||
|
||||
self.fig.tight_layout(rect=[0, 0.03, 1, 0.95]) |
||||
|
||||
|
||||
class InnovationPlot(DataPlot): |
||||
""" |
||||
class for creating an innovation plot. |
||||
""" |
||||
def __init__( |
||||
self, plot_data: dict, variable_names: List[Tuple[str, str]], x_labels: List[str], |
||||
y_labels: List[str], plot_title: str = '', sub_titles: Optional[List[str]] = None, |
||||
pdf_handle: Optional[PdfPages] = None) -> None: |
||||
""" |
||||
initializes a timeseries plot |
||||
:param plot_data: |
||||
:param variable_names: |
||||
:param xlabels: |
||||
:param ylabels: |
||||
:param plot_title: |
||||
:param sub_titles: |
||||
:param pdf_handle: |
||||
""" |
||||
super().__init__( |
||||
plot_data, variable_names, plot_title=plot_title, sub_titles=sub_titles, |
||||
x_labels=x_labels, y_labels=y_labels, pdf_handle=pdf_handle) |
||||
|
||||
|
||||
def plot(self): |
||||
""" |
||||
plots the Innovation data. |
||||
:return: |
||||
""" |
||||
|
||||
if self.fig is None: |
||||
return |
||||
|
||||
for i in range(len(self._variable_names)): |
||||
# create a subplot for every variable |
||||
plt.subplot(len(self._variable_names), 1, i + 1) |
||||
if self._sub_titles is not None: |
||||
plt.title(self._sub_titles[i]) |
||||
|
||||
# plot the value and the standard deviation |
||||
plt.plot( |
||||
1e-6 * self.plot_data['timestamp'], self.plot_data[self._variable_names[i][0]], 'b') |
||||
plt.plot( |
||||
1e-6 * self.plot_data['timestamp'], |
||||
np.sqrt(self.plot_data[self._variable_names[i][1]]), 'r') |
||||
plt.plot( |
||||
1e-6 * self.plot_data['timestamp'], |
||||
-np.sqrt(self.plot_data[self._variable_names[i][1]]), 'r') |
||||
|
||||
plt.xlabel(self._x_labels[i]) |
||||
plt.ylabel(self._y_labels[i]) |
||||
plt.grid() |
||||
|
||||
# add the maximum and minimum value as an annotation |
||||
_, max_value, max_time = get_max_arg_time_value( |
||||
self.plot_data[self._variable_names[i][0]], 1e-6 * self.plot_data['timestamp']) |
||||
_, min_value, min_time = get_min_arg_time_value( |
||||
self.plot_data[self._variable_names[i][0]], 1e-6 * self.plot_data['timestamp']) |
||||
|
||||
plt.text( |
||||
max_time, max_value, 'max={:.2f}'.format(max_value), fontsize=12, |
||||
horizontalalignment='left', |
||||
verticalalignment='bottom') |
||||
plt.text( |
||||
min_time, min_value, 'min={:.2f}'.format(min_value), fontsize=12, |
||||
horizontalalignment='left', |
||||
verticalalignment='top') |
||||
|
||||
self.fig.tight_layout(rect=[0, 0.03, 1, 0.95]) |
||||
|
||||
|
||||
class ControlModeSummaryPlot(DataPlot): |
||||
""" |
||||
class for creating a control mode summary plot. |
||||
""" |
||||
|
||||
def __init__( |
||||
self, data_time: np.ndarray, plot_data: dict, variable_names: List[List[str]], |
||||
x_label: str, y_labels: List[str], annotation_text: List[str], |
||||
additional_annotation: Optional[List[str]] = None, plot_title: str = '', |
||||
sub_titles: Optional[List[str]] = None, |
||||
pdf_handle: Optional[PdfPages] = None) -> None: |
||||
""" |
||||
initializes a timeseries plot |
||||
:param plot_data: |
||||
:param variable_names: |
||||
:param xlabels: |
||||
:param ylabels: |
||||
:param plot_title: |
||||
:param sub_titles: |
||||
:param pdf_handle: |
||||
""" |
||||
super().__init__( |
||||
plot_data, variable_names, plot_title=plot_title, sub_titles=sub_titles, |
||||
x_labels=[x_label]*len(y_labels), y_labels=y_labels, pdf_handle=pdf_handle) |
||||
self._data_time = data_time |
||||
self._annotation_text = annotation_text |
||||
self._additional_annotation = additional_annotation |
||||
|
||||
|
||||
def plot(self): |
||||
""" |
||||
plots the control mode data. |
||||
:return: |
||||
""" |
||||
|
||||
if self.fig is None: |
||||
return |
||||
|
||||
colors = ['b', 'r', 'g', 'c'] |
||||
|
||||
for i in range(len(self._variable_names)): |
||||
# create a subplot for every variable |
||||
plt.subplot(len(self._variable_names), 1, i + 1) |
||||
if self._sub_titles is not None: |
||||
plt.title(self._sub_titles[i]) |
||||
|
||||
for col, var in zip(colors[:len(self._variable_names[i])], self._variable_names[i]): |
||||
plt.plot(self._data_time, self.plot_data[var], col) |
||||
|
||||
plt.xlabel(self._x_labels[i]) |
||||
plt.ylabel(self._y_labels[i]) |
||||
plt.grid() |
||||
plt.ylim(-0.1, 1.1) |
||||
|
||||
for t in range(len(self._annotation_text[i])): |
||||
|
||||
_, _, align_time = get_max_arg_time_value( |
||||
np.diff(self.plot_data[self._variable_names[i][t]]), self._data_time) |
||||
v_annot_pos = (t+1.0)/(len(self._variable_names[i])+1) # vert annotation position |
||||
|
||||
if np.amin(self.plot_data[self._variable_names[i][t]]) > 0: |
||||
plt.text( |
||||
align_time, v_annot_pos, |
||||
'no pre-arm data - cannot calculate {:s} start time'.format( |
||||
self._annotation_text[i][t]), fontsize=12, horizontalalignment='left', |
||||
verticalalignment='center', color=colors[t]) |
||||
elif np.amax(self.plot_data[self._variable_names[i][t]]) > 0: |
||||
plt.text( |
||||
align_time, v_annot_pos, '{:s} at {:.1f} sec'.format( |
||||
self._annotation_text[i][t], align_time), fontsize=12, |
||||
horizontalalignment='left', verticalalignment='center', color=colors[t]) |
||||
|
||||
if self._additional_annotation is not None: |
||||
for a in range(len(self._additional_annotation[i])): |
||||
v_annot_pos = (a + 1.0) / (len(self._additional_annotation[i]) + 1) |
||||
plt.text( |
||||
self._additional_annotation[i][a][0], v_annot_pos, |
||||
self._additional_annotation[i][a][1], fontsize=12, |
||||
horizontalalignment='left', verticalalignment='center', color='b') |
||||
|
||||
self.fig.tight_layout(rect=[0, 0.03, 1, 0.95]) |
||||
|
||||
|
||||
class CheckFlagsPlot(DataPlot): |
||||
""" |
||||
class for creating a control mode summary plot. |
||||
""" |
||||
|
||||
def __init__( |
||||
self, data_time: np.ndarray, plot_data: dict, variable_names: List[List[str]], |
||||
x_label: str, y_labels: List[str], y_lim: Optional[Tuple[int, int]] = None, |
||||
plot_title: str = '', legend: Optional[List[str]] = None, |
||||
sub_titles: Optional[List[str]] = None, pdf_handle: Optional[PdfPages] = None, |
||||
annotate: bool = False) -> None: |
||||
""" |
||||
initializes a timeseries plot |
||||
:param plot_data: |
||||
:param variable_names: |
||||
:param xlabels: |
||||
:param ylabels: |
||||
:param plot_title: |
||||
:param sub_titles: |
||||
:param pdf_handle: |
||||
""" |
||||
super().__init__( |
||||
plot_data, variable_names, plot_title=plot_title, sub_titles=sub_titles, |
||||
x_labels=[x_label]*len(y_labels), y_labels=y_labels, y_lim=y_lim, legend=legend, |
||||
pdf_handle=pdf_handle) |
||||
self._data_time = data_time |
||||
self._b_annotate = annotate |
||||
|
||||
|
||||
def plot(self): |
||||
""" |
||||
plots the control mode data. |
||||
:return: |
||||
""" |
||||
|
||||
if self.fig is None: |
||||
return |
||||
|
||||
colors = ['b', 'r', 'g', 'c', 'k', 'm'] |
||||
|
||||
for i in range(len(self._variable_names)): |
||||
# create a subplot for every variable |
||||
plt.subplot(len(self._variable_names), 1, i + 1) |
||||
if self._sub_titles is not None: |
||||
plt.title(self._sub_titles[i]) |
||||
|
||||
for col, var in zip(colors[:len(self._variable_names[i])], self._variable_names[i]): |
||||
plt.plot(self._data_time, self.plot_data[var], col) |
||||
|
||||
plt.xlabel(self._x_labels[i]) |
||||
plt.ylabel(self._y_labels[i]) |
||||
plt.grid() |
||||
if self._y_lim is not None: |
||||
plt.ylim(self._y_lim) |
||||
|
||||
if self._legend is not None: |
||||
plt.legend(self._legend[i], loc='upper left') |
||||
|
||||
if self._b_annotate: |
||||
for col, var in zip(colors[:len(self._variable_names[i])], self._variable_names[i]): |
||||
# add the maximum and minimum value as an annotation |
||||
_, max_value, max_time = get_max_arg_time_value( |
||||
self.plot_data[var], self._data_time) |
||||
mean_value = np.mean(self.plot_data[var]) |
||||
|
||||
plt.text( |
||||
max_time, max_value, |
||||
'max={:.4f}, mean={:.4f}'.format(max_value, mean_value), color=col, |
||||
fontsize=12, horizontalalignment='left', verticalalignment='bottom') |
||||
|
||||
self.fig.tight_layout(rect=[0, 0.03, 1, 0.95]) |
@ -0,0 +1,353 @@
@@ -0,0 +1,353 @@
|
||||
#! /usr/bin/env python3 |
||||
""" |
||||
function collection for plotting |
||||
""" |
||||
|
||||
# matplotlib don't use Xwindows backend (must be before pyplot import) |
||||
import matplotlib |
||||
matplotlib.use('Agg') |
||||
|
||||
import numpy as np |
||||
from matplotlib.backends.backend_pdf import PdfPages |
||||
from pyulog import ULog |
||||
|
||||
from analysis.post_processing import magnetic_field_estimates_from_status, get_estimator_check_flags |
||||
from plotting.data_plots import TimeSeriesPlot, InnovationPlot, ControlModeSummaryPlot, \ |
||||
CheckFlagsPlot |
||||
from analysis.detectors import PreconditionError |
||||
|
||||
def create_pdf_report(ulog: ULog, output_plot_filename: str) -> None: |
||||
""" |
||||
creates a pdf report of the ekf analysis. |
||||
:param ulog: |
||||
:param output_plot_filename: |
||||
:return: |
||||
""" |
||||
|
||||
# create summary plots |
||||
# save the plots to PDF |
||||
|
||||
try: |
||||
estimator_status = ulog.get_dataset('estimator_status').data |
||||
print('found estimator_status data') |
||||
except: |
||||
raise PreconditionError('could not find estimator_status data') |
||||
|
||||
try: |
||||
ekf2_innovations = ulog.get_dataset('ekf2_innovations').data |
||||
print('found ekf2_innovation data') |
||||
except: |
||||
raise PreconditionError('could not find ekf2_innovation data') |
||||
|
||||
try: |
||||
sensor_preflight = ulog.get_dataset('sensor_preflight').data |
||||
print('found sensor_preflight data') |
||||
except: |
||||
raise PreconditionError('could not find sensor_preflight data') |
||||
|
||||
control_mode, innov_flags, gps_fail_flags = get_estimator_check_flags(estimator_status) |
||||
|
||||
status_time = 1e-6 * estimator_status['timestamp'] |
||||
|
||||
b_finishes_in_air, b_starts_in_air, in_air_duration, in_air_transition_time, \ |
||||
on_ground_transition_time = detect_airtime(control_mode, status_time) |
||||
|
||||
with PdfPages(output_plot_filename) as pdf_pages: |
||||
|
||||
# plot IMU consistency data |
||||
if ('accel_inconsistency_m_s_s' in sensor_preflight.keys()) and ( |
||||
'gyro_inconsistency_rad_s' in sensor_preflight.keys()): |
||||
data_plot = TimeSeriesPlot( |
||||
sensor_preflight, [['accel_inconsistency_m_s_s'], ['gyro_inconsistency_rad_s']], |
||||
x_labels=['data index', 'data index'], |
||||
y_labels=['acceleration (m/s/s)', 'angular rate (rad/s)'], |
||||
plot_title='IMU Consistency Check Levels', pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# vertical velocity and position innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('vel_pos_innov[2]', 'vel_pos_innov_var[2]'), |
||||
('vel_pos_innov[5]', 'vel_pos_innov_var[5]')], |
||||
x_labels=['time (sec)', 'time (sec)'], |
||||
y_labels=['Down Vel (m/s)', 'Down Pos (m)'], plot_title='Vertical Innovations', |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# horizontal velocity innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('vel_pos_innov[0]', 'vel_pos_innov_var[0]'), |
||||
('vel_pos_innov[1]','vel_pos_innov_var[1]')], |
||||
x_labels=['time (sec)', 'time (sec)'], |
||||
y_labels=['North Vel (m/s)', 'East Vel (m/s)'], |
||||
plot_title='Horizontal Velocity Innovations', pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# horizontal position innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('vel_pos_innov[3]', 'vel_pos_innov_var[3]'), ('vel_pos_innov[4]', |
||||
'vel_pos_innov_var[4]')], |
||||
x_labels=['time (sec)', 'time (sec)'], |
||||
y_labels=['North Pos (m)', 'East Pos (m)'], plot_title='Horizontal Position Innovations', |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# magnetometer innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('mag_innov[0]', 'mag_innov_var[0]'), |
||||
('mag_innov[1]', 'mag_innov_var[1]'), ('mag_innov[2]', 'mag_innov_var[2]')], |
||||
x_labels=['time (sec)', 'time (sec)', 'time (sec)'], |
||||
y_labels=['X (Gauss)', 'Y (Gauss)', 'Z (Gauss)'], plot_title='Magnetometer Innovations', |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# magnetic heading innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('heading_innov', 'heading_innov_var')], |
||||
x_labels=['time (sec)'], y_labels=['Heading (rad)'], |
||||
plot_title='Magnetic Heading Innovations', pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# air data innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, |
||||
[('airspeed_innov', 'airspeed_innov_var'), ('beta_innov', 'beta_innov_var')], |
||||
x_labels=['time (sec)', 'time (sec)'], |
||||
y_labels=['innovation (m/sec)', 'innovation (rad)'], |
||||
sub_titles=['True Airspeed Innovations', 'Synthetic Sideslip Innovations'], |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# optical flow innovations |
||||
data_plot = InnovationPlot( |
||||
ekf2_innovations, [('flow_innov[0]', 'flow_innov_var[0]'), ('flow_innov[1]', |
||||
'flow_innov_var[1]')], |
||||
x_labels=['time (sec)', 'time (sec)'], |
||||
y_labels=['X (rad/sec)', 'Y (rad/sec)'], |
||||
plot_title='Optical Flow Innovations', pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# plot normalised innovation test levels |
||||
# define variables to plot |
||||
variables = [['mag_test_ratio'], ['vel_test_ratio', 'pos_test_ratio'], ['hgt_test_ratio']] |
||||
y_labels = ['mag', 'vel, pos', 'hgt'] |
||||
legend = [['mag'], ['vel', 'pos'], ['hgt']] |
||||
if np.amax(estimator_status['hagl_test_ratio']) > 0.0: # plot hagl test ratio, if applicable |
||||
variables[-1].append('hagl_test_ratio') |
||||
y_labels[-1] += ', hagl' |
||||
legend[-1].append('hagl') |
||||
|
||||
if np.amax(estimator_status[ |
||||
'tas_test_ratio']) > 0.0: # plot airspeed sensor test ratio, if applicable |
||||
variables.append(['tas_test_ratio']) |
||||
y_labels.append('TAS') |
||||
legend.append(['airspeed']) |
||||
|
||||
data_plot = CheckFlagsPlot( |
||||
status_time, estimator_status, variables, x_label='time (sec)', y_labels=y_labels, |
||||
plot_title='Normalised Innovation Test Levels', pdf_handle=pdf_pages, annotate=True, |
||||
legend=legend |
||||
) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# plot control mode summary A |
||||
data_plot = ControlModeSummaryPlot( |
||||
status_time, control_mode, [['tilt_aligned', 'yaw_aligned'], |
||||
['using_gps', 'using_optflow', 'using_evpos'], ['using_barohgt', 'using_gpshgt', |
||||
'using_rnghgt', 'using_evhgt'], ['using_magyaw', 'using_mag3d', 'using_magdecl']], |
||||
x_label='time (sec)', y_labels=['aligned', 'pos aiding', 'hgt aiding', 'mag aiding'], |
||||
annotation_text=[['tilt alignment', 'yaw alignment'], ['GPS aiding', 'optical flow aiding', |
||||
'external vision aiding'], ['Baro aiding', 'GPS aiding', 'rangefinder aiding', |
||||
'external vision aiding'], ['magnetic yaw aiding', '3D magnetoemter aiding', |
||||
'magnetic declination aiding']], plot_title='EKF Control Status - Figure A', |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# plot control mode summary B |
||||
# construct additional annotations for the airborne plot |
||||
airborne_annotations = list() |
||||
if np.amin(np.diff(control_mode['airborne'])) > -0.5: |
||||
airborne_annotations.append( |
||||
(on_ground_transition_time, 'air to ground transition not detected')) |
||||
else: |
||||
airborne_annotations.append((on_ground_transition_time, 'on-ground at {:.1f} sec'.format( |
||||
on_ground_transition_time))) |
||||
if in_air_duration > 0.0: |
||||
airborne_annotations.append(((in_air_transition_time + on_ground_transition_time) / 2, |
||||
'duration = {:.1f} sec'.format(in_air_duration))) |
||||
if np.amax(np.diff(control_mode['airborne'])) < 0.5: |
||||
airborne_annotations.append( |
||||
(in_air_transition_time, 'ground to air transition not detected')) |
||||
else: |
||||
airborne_annotations.append( |
||||
(in_air_transition_time, 'in-air at {:.1f} sec'.format(in_air_transition_time))) |
||||
|
||||
data_plot = ControlModeSummaryPlot( |
||||
status_time, control_mode, [['airborne'], ['estimating_wind']], |
||||
x_label='time (sec)', y_labels=['airborne', 'estimating wind'], annotation_text=[[], []], |
||||
additional_annotation=[airborne_annotations, []], |
||||
plot_title='EKF Control Status - Figure B', pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# plot innovation_check_flags summary |
||||
data_plot = CheckFlagsPlot( |
||||
status_time, innov_flags, [['vel_innov_fail', 'posh_innov_fail'], ['posv_innov_fail', |
||||
'hagl_innov_fail'], |
||||
['magx_innov_fail', 'magy_innov_fail', 'magz_innov_fail', |
||||
'yaw_innov_fail'], ['tas_innov_fail'], ['sli_innov_fail'], |
||||
['ofx_innov_fail', |
||||
'ofy_innov_fail']], x_label='time (sec)', |
||||
y_labels=['failed', 'failed', 'failed', 'failed', 'failed', 'failed'], |
||||
y_lim=(-0.1, 1.1), |
||||
legend=[['vel NED', 'pos NE'], ['hgt absolute', 'hgt above ground'], |
||||
['mag_x', 'mag_y', 'mag_z', 'yaw'], ['airspeed'], ['sideslip'], |
||||
['flow X', 'flow Y']], |
||||
plot_title='EKF Innovation Test Fails', annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# gps_check_fail_flags summary |
||||
data_plot = CheckFlagsPlot( |
||||
status_time, gps_fail_flags, |
||||
[['nsat_fail', 'gdop_fail', 'herr_fail', 'verr_fail', 'gfix_fail', 'serr_fail'], |
||||
['hdrift_fail', 'vdrift_fail', 'hspd_fail', 'veld_diff_fail']], |
||||
x_label='time (sec)', y_lim=(-0.1, 1.1), y_labels=['failed', 'failed'], |
||||
sub_titles=['GPS Direct Output Check Failures', 'GPS Derived Output Check Failures'], |
||||
legend=[['N sats', 'GDOP', 'horiz pos error', 'vert pos error', 'fix type', |
||||
'speed error'], ['horiz drift', 'vert drift', 'horiz speed', |
||||
'vert vel inconsistent']], annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# filter reported accuracy |
||||
data_plot = CheckFlagsPlot( |
||||
status_time, estimator_status, [['pos_horiz_accuracy', 'pos_vert_accuracy']], |
||||
x_label='time (sec)', y_labels=['accuracy (m)'], plot_title='Reported Accuracy', |
||||
legend=[['horizontal', 'vertical']], annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the EKF IMU vibration metrics |
||||
scaled_estimator_status = {'vibe[0]': 1000. * estimator_status['vibe[0]'], |
||||
'vibe[1]': 1000. * estimator_status['vibe[1]'], |
||||
'vibe[2]': estimator_status['vibe[2]'] |
||||
} |
||||
data_plot = CheckFlagsPlot( |
||||
status_time, scaled_estimator_status, [['vibe[0]'], ['vibe[1]'], ['vibe[2]']], |
||||
x_label='time (sec)', y_labels=['Del Ang Coning (mrad)', 'HF Del Ang (mrad)', |
||||
'HF Del Vel (m/s)'], plot_title='IMU Vibration Metrics', |
||||
pdf_handle=pdf_pages, annotate=True) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the EKF output observer tracking errors |
||||
scaled_innovations = { |
||||
'output_tracking_error[0]': 1000. * ekf2_innovations['output_tracking_error[0]'], |
||||
'output_tracking_error[1]': ekf2_innovations['output_tracking_error[1]'], |
||||
'output_tracking_error[2]': ekf2_innovations['output_tracking_error[2]'] |
||||
} |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * ekf2_innovations['timestamp'], scaled_innovations, |
||||
[['output_tracking_error[0]'], ['output_tracking_error[1]'], |
||||
['output_tracking_error[2]']], x_label='time (sec)', |
||||
y_labels=['angles (mrad)', 'velocity (m/s)', 'position (m)'], |
||||
plot_title='Output Observer Tracking Error Magnitudes', |
||||
pdf_handle=pdf_pages, annotate=True) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the delta angle bias estimates |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * estimator_status['timestamp'], estimator_status, |
||||
[['states[10]'], ['states[11]'], ['states[12]']], |
||||
x_label='time (sec)', y_labels=['X (rad)', 'Y (rad)', 'Z (rad)'], |
||||
plot_title='Delta Angle Bias Estimates', annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the delta velocity bias estimates |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * estimator_status['timestamp'], estimator_status, |
||||
[['states[13]'], ['states[14]'], ['states[15]']], |
||||
x_label='time (sec)', y_labels=['X (m/s)', 'Y (m/s)', 'Z (m/s)'], |
||||
plot_title='Delta Velocity Bias Estimates', annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the earth frame magnetic field estimates |
||||
declination, field_strength, inclination = magnetic_field_estimates_from_status( |
||||
estimator_status) |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * estimator_status['timestamp'], |
||||
{'strength': field_strength, 'declination': declination, 'inclination': inclination}, |
||||
[['declination'], ['inclination'], ['strength']], |
||||
x_label='time (sec)', y_labels=['declination (deg)', 'inclination (deg)', |
||||
'strength (Gauss)'], |
||||
plot_title='Earth Magnetic Field Estimates', annotate=False, |
||||
pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the body frame magnetic field estimates |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * estimator_status['timestamp'], estimator_status, |
||||
[['states[19]'], ['states[20]'], ['states[21]']], |
||||
x_label='time (sec)', y_labels=['X (Gauss)', 'Y (Gauss)', 'Z (Gauss)'], |
||||
plot_title='Magnetometer Bias Estimates', annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
# Plot the EKF wind estimates |
||||
data_plot = CheckFlagsPlot( |
||||
1e-6 * estimator_status['timestamp'], estimator_status, |
||||
[['states[22]'], ['states[23]']], x_label='time (sec)', |
||||
y_labels=['North (m/s)', 'East (m/s)'], plot_title='Wind Velocity Estimates', |
||||
annotate=False, pdf_handle=pdf_pages) |
||||
data_plot.save() |
||||
data_plot.close() |
||||
|
||||
|
||||
def detect_airtime(control_mode, status_time): |
||||
# define flags for starting and finishing in air |
||||
b_starts_in_air = False |
||||
b_finishes_in_air = False |
||||
# calculate in-air transition time |
||||
if (np.amin(control_mode['airborne']) < 0.5) and (np.amax(control_mode['airborne']) > 0.5): |
||||
in_air_transtion_time_arg = np.argmax(np.diff(control_mode['airborne'])) |
||||
in_air_transition_time = status_time[in_air_transtion_time_arg] |
||||
elif (np.amax(control_mode['airborne']) > 0.5): |
||||
in_air_transition_time = np.amin(status_time) |
||||
print('log starts while in-air at ' + str(round(in_air_transition_time, 1)) + ' sec') |
||||
b_starts_in_air = True |
||||
else: |
||||
in_air_transition_time = float('NaN') |
||||
print('always on ground') |
||||
# calculate on-ground transition time |
||||
if (np.amin(np.diff(control_mode['airborne'])) < 0.0): |
||||
on_ground_transition_time_arg = np.argmin(np.diff(control_mode['airborne'])) |
||||
on_ground_transition_time = status_time[on_ground_transition_time_arg] |
||||
elif (np.amax(control_mode['airborne']) > 0.5): |
||||
on_ground_transition_time = np.amax(status_time) |
||||
print('log finishes while in-air at ' + str(round(on_ground_transition_time, 1)) + ' sec') |
||||
b_finishes_in_air = True |
||||
else: |
||||
on_ground_transition_time = float('NaN') |
||||
print('always on ground') |
||||
if (np.amax(np.diff(control_mode['airborne'])) > 0.5) and (np.amin(np.diff(control_mode['airborne'])) < -0.5): |
||||
if ((on_ground_transition_time - in_air_transition_time) > 0.0): |
||||
in_air_duration = on_ground_transition_time - in_air_transition_time |
||||
else: |
||||
in_air_duration = float('NaN') |
||||
else: |
||||
in_air_duration = float('NaN') |
||||
return b_finishes_in_air, b_starts_in_air, in_air_duration, in_air_transition_time, on_ground_transition_time |
Loading…
Reference in new issue