You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

295 lines
15 KiB

from LogAnalyzer import Test,TestResult
import DataflashLog
from math import sqrt
import numpy as np
import matplotlib.pyplot as plt
class TestFlow(Test):
'''test optical flow sensor scale factor calibration'''
#
# Use the following procedure to log the calibration data. is assumed that the optical flow sensor has been
# correctly aligned, is focussed and the test is performed over a textured surface with adequate lighting.
# Note that the strobing effect from non incandescent artifical lighting can produce poor optical flow measurements.
#
# 1) Set LOG_DISARMED and FLOW_TYPE to 10 and verify that ATT and OF messages are being logged onboard
# 2) Place on level ground, apply power and wait for EKF to complete attitude alignment
# 3) Keeping the copter level, lift it to shoulder height and rock between +-20 and +-30 degrees
# in roll about an axis that passes through the flow sensor lens assembly. The time taken to rotate from
# maximum left roll to maximum right roll should be about 1 second.
# 4) Repeat 3) about the pitch axis
# 5) Holding the copter level, lower it to the ground and remove power
# 6) Transfer the logfile from the sdcard.
# 7) Open a terminal and cd to the ardupilot/Tools/LogAnalyzer directory
# 8) Enter to run the analysis 'python LogAnalyzer.py <log file name including full path>'
# 9) Check the OpticalFlow test status printed to the screen. The analysis plots are saved to
# flow_calibration.pdf and the recommended scale factors to flow_calibration.param
def __init__(self):
Test.__init__(self)
self.name = "OpticalFlow"
def run(self, logdata, verbose):
self.result = TestResult()
self.result.status = TestResult.StatusType.GOOD
def FAIL():
self.result.status = TestResult.StatusType.FAIL
def WARN():
if self.result.status != TestResult.StatusType.FAIL:
self.result.status = TestResult.StatusType.WARN
try:
# tuning parameters used by the algorithm
tilt_threshold = 15 # roll and pitch threshold used to start and stop calibration (deg)
quality_threshold = 124 # minimum flow quality required for data to be used by the curve fit (N/A)
min_rate_threshold = 0.0 # if the gyro rate is less than this, the data will not be used by the curve fit (rad/sec)
max_rate_threshold = 2.0 # if the gyro rate is greter than this, the data will not be used by the curve fit (rad/sec)
param_std_threshold = 5.0 # maximum allowable 1-std uncertainty in scaling parameter (scale factor * 1000)
param_abs_threshold = 200 # max/min allowable scale factor parameter. Values of FLOW_FXSCALER and FLOW_FYSCALER outside the range of +-param_abs_threshold indicate a sensor configuration problem.
min_num_points = 100 # minimum number of points required for a curve fit - this is necessary, but not sufficient condition - the standard deviation estimate of the fit gradient is also important.
# get the existing scale parameters
flow_fxscaler = logdata.parameters["FLOW_FXSCALER"]
flow_fyscaler = logdata.parameters["FLOW_FYSCALER"]
# load required optical flow data
if "OF" in logdata.channels:
flowX = np.zeros(len(logdata.channels["OF"]["flowX"].listData))
for i in range(len(logdata.channels["OF"]["flowX"].listData)):
(line, flowX[i]) = logdata.channels["OF"]["flowX"].listData[i]
bodyX = np.zeros(len(logdata.channels["OF"]["bodyX"].listData))
for i in range(len(logdata.channels["OF"]["bodyX"].listData)):
(line, bodyX[i]) = logdata.channels["OF"]["bodyX"].listData[i]
flowY = np.zeros(len(logdata.channels["OF"]["flowY"].listData))
for i in range(len(logdata.channels["OF"]["flowY"].listData)):
(line, flowY[i]) = logdata.channels["OF"]["flowY"].listData[i]
bodyY = np.zeros(len(logdata.channels["OF"]["bodyY"].listData))
for i in range(len(logdata.channels["OF"]["bodyY"].listData)):
(line, bodyY[i]) = logdata.channels["OF"]["bodyY"].listData[i]
flow_time_us = np.zeros(len(logdata.channels["OF"]["TimeUS"].listData))
for i in range(len(logdata.channels["OF"]["TimeUS"].listData)):
(line, flow_time_us[i]) = logdata.channels["OF"]["TimeUS"].listData[i]
flow_qual = np.zeros(len(logdata.channels["OF"]["Qual"].listData))
for i in range(len(logdata.channels["OF"]["Qual"].listData)):
(line, flow_qual[i]) = logdata.channels["OF"]["Qual"].listData[i]
else:
FAIL()
self.result.statusMessage = "FAIL: no optical flow data\n"
return
# load required attitude data
if "ATT" in logdata.channels:
Roll = np.zeros(len(logdata.channels["ATT"]["Roll"].listData))
for i in range(len(logdata.channels["ATT"]["Roll"].listData)):
(line, Roll[i]) = logdata.channels["ATT"]["Roll"].listData[i]
Pitch = np.zeros(len(logdata.channels["ATT"]["Pitch"].listData))
for i in range(len(logdata.channels["ATT"]["Pitch"].listData)):
(line, Pitch[i]) = logdata.channels["ATT"]["Pitch"].listData[i]
att_time_us = np.zeros(len(logdata.channels["ATT"]["TimeUS"].listData))
for i in range(len(logdata.channels["ATT"]["TimeUS"].listData)):
(line, att_time_us[i]) = logdata.channels["ATT"]["TimeUS"].listData[i]
else:
FAIL()
self.result.statusMessage = "FAIL: no attitude data\n"
return
# calculate the start time for the roll calibration
startTime = int(0)
startRollIndex = int(0)
for i in range(len(Roll)):
if abs(Roll[i]) > tilt_threshold:
startTime = att_time_us[i]
break
for i in range(len(flow_time_us)):
if flow_time_us[i] > startTime:
startRollIndex = i
break
# calculate the end time for the roll calibration
endTime = int(0)
endRollIndex = int(0)
for i in range(len(Roll)-1,-1,-1):
if abs(Roll[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us)-1,-1,-1):
if flow_time_us[i] < endTime:
endRollIndex = i
break
# check we have enough roll data points
if (endRollIndex - startRollIndex <= min_num_points):
FAIL()
self.result.statusMessage = "FAIL: insufficient roll data pointsa\n"
return
# resample roll test data excluding data before first movement and after last movement
# also exclude data where there is insufficient angular rate
flowX_resampled = []
bodyX_resampled = []
flowX_time_us_resampled = []
for i in range(len(Roll)):
if (i >= startRollIndex) and (i <= endRollIndex) and (abs(bodyX[i]) > min_rate_threshold) and (abs(bodyX[i]) < max_rate_threshold) and (flow_qual[i] > quality_threshold):
flowX_resampled.append(flowX[i])
bodyX_resampled.append(bodyX[i])
flowX_time_us_resampled.append(flow_time_us[i])
# calculate the start time for the pitch calibration
startTime = 0
startPitchIndex = int(0)
for i in range(len(Pitch)):
if abs(Pitch[i]) > tilt_threshold:
startTime = att_time_us[i]
break
for i in range(len(flow_time_us)):
if flow_time_us[i] > startTime:
startPitchIndex = i
break
# calculate the end time for the pitch calibration
endTime = 0
endPitchIndex = int(0)
for i in range(len(Pitch)-1,-1,-1):
if abs(Pitch[i]) > tilt_threshold:
endTime = att_time_us[i]
break
for i in range(len(flow_time_us)-1,-1,-1):
if flow_time_us[i] < endTime:
endPitchIndex = i
break
# check we have enough pitch data points
if (endPitchIndex - startPitchIndex <= min_num_points):
FAIL()
self.result.statusMessage = "FAIL: insufficient pitch data pointsa\n"
return
# resample pitch test data excluding data before first movement and after last movement
# also exclude data where there is insufficient or too much angular rate
flowY_resampled = []
bodyY_resampled = []
flowY_time_us_resampled = []
for i in range(len(Roll)):
if (i >= startPitchIndex) and (i <= endPitchIndex) and (abs(bodyY[i]) > min_rate_threshold) and (abs(bodyY[i]) < max_rate_threshold) and (flow_qual[i] > quality_threshold):
flowY_resampled.append(flowY[i])
bodyY_resampled.append(bodyY[i])
flowY_time_us_resampled.append(flow_time_us[i])
# fit a straight line to the flow vs body rate data and calculate the scale factor parameter required to achieve a slope of 1
coef_flow_x , cov_x = np.polyfit(bodyX_resampled,flowX_resampled,1,rcond=None, full=False, w=None, cov=True)
coef_flow_y , cov_y = np.polyfit(bodyY_resampled,flowY_resampled,1,rcond=None, full=False, w=None, cov=True)
# taking the exisiting scale factor parameters into account, calculate the parameter values reequired to achieve a unity slope
flow_fxscaler_new = int(1000 * (((1 + 0.001 * float(flow_fxscaler))/coef_flow_x[0] - 1)))
flow_fyscaler_new = int(1000 * (((1 + 0.001 * float(flow_fyscaler))/coef_flow_y[0] - 1)))
# Do a sanity check on the scale factor variance
if sqrt(cov_x[0][0]) > param_std_threshold or sqrt(cov_y[0][0]) > param_std_threshold:
FAIL()
self.result.statusMessage = "FAIL: inaccurate fit - poor quality or insufficient data\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" % (round(1000*sqrt(cov_x[0][0])),round(1000*sqrt(cov_y[0][0])))
# Do a sanity check on the scale factors
if abs(flow_fxscaler_new) > param_abs_threshold or abs(flow_fyscaler_new) > param_abs_threshold:
FAIL()
self.result.statusMessage = "FAIL: required scale factors are excessive\nFLOW_FXSCALER=%i\nFLOW_FYSCALER=%i\n" % (flow_fxscaler,flow_fyscaler)
# display recommended scale factors
self.result.statusMessage = "Set FLOW_FXSCALER to %i\nSet FLOW_FYSCALER to %i\n\nCal plots saved to flow_calibration.pdf\nCal parameters saved to flow_calibration.param\n\nFLOW_FXSCALER 1STD = %u\nFLOW_FYSCALER 1STD = %u\n" % (flow_fxscaler_new,flow_fyscaler_new,round(1000*sqrt(cov_x[0][0])),round(1000*sqrt(cov_y[0][0])))
# calculate fit display data
body_rate_display = [-max_rate_threshold,max_rate_threshold]
fit_coef_x = np.poly1d(coef_flow_x)
flowX_display = fit_coef_x(body_rate_display)
fit_coef_y = np.poly1d(coef_flow_y)
flowY_display = fit_coef_y(body_rate_display)
# plot and save calibration test points to PDF
from matplotlib.backends.backend_pdf import PdfPages
output_plot_filename = "flow_calibration.pdf"
pp = PdfPages(output_plot_filename)
plt.figure(1,figsize=(20,13))
plt.subplot(2,1,1)
plt.plot(bodyX_resampled,flowX_resampled,'b', linestyle=' ', marker='o',label="test points")
plt.plot(body_rate_display,flowX_display,'r',linewidth=2.5,label="linear fit")
plt.title('X axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
plt.grid()
plt.legend(loc='upper left')
# draw plots
plt.subplot(2,1,2)
plt.plot(bodyY_resampled,flowY_resampled,'b', linestyle=' ', marker='o',label="test points")
plt.plot(body_rate_display,flowY_display,'r',linewidth=2.5,label="linear fit")
plt.title('Y axis flow rate vs gyro rate')
plt.ylabel('flow rate (rad/s)')
plt.xlabel('gyro rate (rad/sec)')
plt.grid()
plt.legend(loc='upper left')
pp.savefig()
plt.figure(2,figsize=(20,13))
plt.subplot(2,1,1)
plt.plot(flow_time_us,flowX,'b',label="flow rate - all")
plt.plot(flow_time_us,bodyX,'r',label="gyro rate - all")
plt.plot(flowX_time_us_resampled,flowX_resampled,'c', linestyle=' ', marker='o',label="flow rate - used")
plt.plot(flowX_time_us_resampled,bodyX_resampled,'m', linestyle=' ', marker='o',label="gyro rate - used")
plt.title('X axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
plt.grid()
plt.legend(loc='upper left')
# draw plots
plt.subplot(2,1,2)
plt.plot(flow_time_us,flowY,'b',label="flow rate - all")
plt.plot(flow_time_us,bodyY,'r',label="gyro rate - all")
plt.plot(flowY_time_us_resampled,flowY_resampled,'c', linestyle=' ', marker='o',label="flow rate - used")
plt.plot(flowY_time_us_resampled,bodyY_resampled,'m', linestyle=' ', marker='o',label="gyro rate - used")
plt.title('Y axis flow and body rate vs time')
plt.ylabel('rate (rad/s)')
plt.xlabel('time (usec)')
plt.grid()
plt.legend(loc='upper left')
pp.savefig()
# close the pdf file
pp.close()
# close all figures
plt.close("all")
# write correction parameters to file
test_results_filename = "flow_calibration.param"
file = open(test_results_filename,"w")
file.write("FLOW_FXSCALER"+" "+str(flow_fxscaler_new)+"\n")
file.write("FLOW_FYSCALER"+" "+str(flow_fyscaler_new)+"\n")
file.close()
except KeyError as e:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = str(e) + ' not found'