Browse Source

LogAnalyzer: Initial commit for discussion

A lot of this is still stub code, but far enough along for discussion
and feedback. Some good example tests are TestVibration and TestBrownout

datatypes handled correctly now (previsouly all read as floats), added
flag to skip bad input lines, now prints some general log info (size,
duration, etc), added some basic performance timing,
master
Andrew Chapman 11 years ago committed by Andrew Tridgell
parent
commit
36e480483f
  1. 287
      Tools/LogAnalyzer/DataflashLog.py
  2. 183
      Tools/LogAnalyzer/LogAnalyzer.py
  3. 7
      Tools/LogAnalyzer/UnitTest.py
  4. 7777
      Tools/LogAnalyzer/logs/mechanical_fail.log
  5. 2738
      Tools/LogAnalyzer/logs/nan.log
  6. 4197
      Tools/LogAnalyzer/logs/tradheli_brownout.log
  7. 9888
      Tools/LogAnalyzer/logs/underpowered.log
  8. 18
      Tools/LogAnalyzer/tests/TestBalanceTwist.py
  9. 34
      Tools/LogAnalyzer/tests/TestBrownout.py
  10. 54
      Tools/LogAnalyzer/tests/TestCompass.py
  11. 19
      Tools/LogAnalyzer/tests/TestEmpty.py
  12. 54
      Tools/LogAnalyzer/tests/TestEvents.py
  13. 32
      Tools/LogAnalyzer/tests/TestGPSGlitch.py
  14. 52
      Tools/LogAnalyzer/tests/TestParams.py
  15. 16
      Tools/LogAnalyzer/tests/TestPerformance.py
  16. 18
      Tools/LogAnalyzer/tests/TestPitchRollCoupling.py
  17. 18
      Tools/LogAnalyzer/tests/TestUnderpowered.py
  18. 34
      Tools/LogAnalyzer/tests/TestVCC.py
  19. 63
      Tools/LogAnalyzer/tests/TestVibration.py

287
Tools/LogAnalyzer/DataflashLog.py

@ -0,0 +1,287 @@ @@ -0,0 +1,287 @@
#
# Code to abstract the parsing of APM Dataflash log files, currently only used by the LogAnalyzer
#
# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014
#
# TODO: log reading needs to be much more robust, only compatible with AC3.0.1-AC3.1 logs at this point, try to be compatible at least back to AC2.9.1, and APM:Plane too (skipping copter-only tests)
# TODO: rethink data storage, dictionaries are good for specific line lookups, but we don't end up doing a lot of that
import pprint # temp
import collections
import os
class Format:
'''Channel format as specified by the FMT lines in the log file'''
msgType = 0
msgLen = 0
name = ""
types = ""
labels = []
def __init__(self,msgType,msgLen,name,types,labels):
self.msgType = msgType
self.msgLen = msgLen
self.name = name
self.types = types
self.labels = labels.split(',')
def __str__(self):
return "%8s %s" % (self.name, `self.labels`)
class Channel:
'''storage for a single stream of data, i.e. all GPS.RelAlt values'''
dictData = None # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go
listData = None # list of (linenum,value)
def __init__(self):
self.dictData = {}
self.listData = []
def getSegment(self, startLine, endLine):
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance'''
segment = Channel()
segment.dictData = {k:v for k,v in self.dictData.iteritems() if k >= startLine and k <= endLine}
return segment
def min(self):
return min(self.dictData.values())
def max(self):
return max(self.dictData.values())
def avg(self):
return avg(self.dictData.values())
def getNearestValue(self, lineNumber, lookForwards=True):
'''return the nearest data value to the given lineNumber'''
if lineNumber in self.dictData:
return self.dictData[lineNumber]
offset = 1
if not lookForwards:
offset = -1
minLine = min(self.dictData.keys())
maxLine = max(self.dictData.keys())
line = max(minLine,lineNumber)
line = min(maxLine,line)
while line >= minLine and line <= maxLine:
if line in self.dictData:
return self.dictData[line]
line = line + offset
raise Exception("Error finding nearest value for line %d" % lineNumber)
class DataflashLogHelper:
@staticmethod
def getTimeAtLine(logdata, lineNumber):
'''returns the nearest GPS timestamp in milliseconds after the given line number'''
if not "GPS" in logdata.channels:
raise Exception("no GPS log data found")
# older logs use 'TIme', newer logs use 'TimeMS'
timeLabel = "TimeMS"
if "Time" in logdata.channels["GPS"]:
timeLabel = "Time"
while lineNumber <= logdata.lineCount:
if lineNumber in logdata.channels["GPS"][timeLabel].dictData:
return logdata.channels["GPS"][timeLabel].dictData[lineNumber]
lineNumber = lineNumber + 1
@staticmethod
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True):
'''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs'''
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it
def chunkSizeCompare(chunk1, chunk2):
chunk1Len = chunk1[1]-chunk1[0]
chunk2Len = chunk2[1]-chunk2[0]
if chunk1Len == chunk2Len:
return 0
elif chunk1Len > chunk2Len:
return -1
else:
return 1
od = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0]))
chunks = []
for i in range(len(od.keys())):
if od.values()[i][0] == "LOITER":
startLine = od.keys()[i]
endLine = None
if i == len(od.keys())-1:
endLine = logdata.lineCount
else:
endLine = od.keys()[i+1]-1
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0
if chunkTimeSeconds > minLengthSeconds:
chunks.append((startLine,endLine))
#print "LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)
#print " (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)
chunks.sort(chunkSizeCompare)
return chunks
@staticmethod
def isLogEmpty(logdata):
'''returns an human readable error string if the log is essentially empty, otherwise returns None'''
# naive check for now, see if the throttle output was ever above 20%
throttleThreshold = 200
if "CTUN" in logdata.channels:
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max()
if maxThrottle < throttleThreshold:
return "Throttle never above 20%"
return None
class DataflashLog:
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class'''
filename = None
vehicleType = None # ArduCopter, ArduPlane, ArduRover, etc, verbatim as given by header
firmwareVersion = None
firmwareHash = None
freeRAM = None
hardwareType = None # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing
formats = {} # name -> Format
parameters = {} # token -> value
messages = {} # lineNum -> message
modeChanges = {} # lineNum -> (mode,value)
channels = {} # lineLabel -> {dataLabel:Channel}
filesizeKB = None
durationSecs = None
lineCount = None
def getCopterType(self):
if self.vehicleType != "ArduCopter":
return None
motLabels = []
if "MOT" in self.formats: # not listed in PX4 log header for some reason?
motLabels = self.formats["MOT"].labels
if "GGain" in motLabels:
return "tradheli"
elif len(motLabels) == 4:
return "quad"
elif len(motLabels) == 6:
return "hex"
elif len(motLabels) == 8:
return "octo"
else:
return ""
def __castToFormatType(self,value,valueType):
'''using format characters from libraries/DataFlash/DataFlash.h to cast to basic python int/float/string types'''
intTypes = "bBhHiIM"
floatTypes = "fcCeEL"
charTypes = "nNZ"
if valueType in floatTypes:
return float(value)
elif valueType in intTypes:
return int(value)
elif valueType in charTypes:
return str(value)
else:
raise Exception("Unknown value type of '%s' specified to __castToFormatType()" % valueType)
def __init__(self, logfile, ignoreBadlines=False):
self.read(logfile, ignoreBadlines)
def read(self, logfile, ignoreBadlines=False):
# TODO: dataflash log parsing code is *SUPER* hacky, should re-write more methodically
self.filename = logfile
f = open(self.filename, 'r')
lineNumber = 0
copterLogPre3Header = False
copterLogPre3Params = False
knownHardwareTypes = ["APM", "PX4", "MPNG"]
for line in f:
lineNumber = lineNumber + 1
try:
#print "Reading line: %d" % lineNumber
line = line.strip('\n\r')
tokens = line.split(', ')
# first handle the log header lines
if copterLogPre3Header and line[0:15] == "SYSID_SW_MREV: ":
copterLogPre3Header = False
copterLogPre3Params = True
if len(tokens) == 1:
tokens2 = line.split(' ')
if line == "":
pass
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index
pass
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:":
self.freeRAM = int(tokens2[2])
elif tokens2[0] in knownHardwareTypes:
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim
elif line[0:8] == "FW Ver: ":
copterLogPre3Header = True
elif copterLogPre3Header:
pass # just skip over all that until we hit the PARM lines
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0] == "V": # e.g. ArduCopter V3.1 (5c6503e2)
self.vehicleType = tokens2[0]
self.firmwareVersion = tokens2[1]
if len(tokens2) == 3:
self.firmwareHash = tokens2[2][1:-1]
elif len(tokens2) == 2 and copterLogPre3Params:
pName = tokens2[0]
self.parameters[pName] = float(tokens2[1])
else:
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename)
if ignoreBadlines:
print errorMsg + " (skipping line)"
else:
raise Exception(errorMsg)
# now handle the non-log data stuff, format descriptions, params, etc
elif tokens[0] == "FMT":
format = None
if len(tokens) == 6:
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],tokens[5])
elif len(tokens) == 5: # some logs have FMT STRT with no labels
format = Format(tokens[1],tokens[2],tokens[3],tokens[4],"")
else:
raise Exception("FMT error on line %d, nTokens: %d" % (lineNumber, len(tokens)))
#print format # TEMP
self.formats[tokens[3]] = format
elif tokens[0] == "PARM":
pName = tokens[1]
self.parameters[pName] = float(tokens[2])
elif tokens[0] == "MSG":
self.messages[lineNumber] = tokens[1]
elif tokens[0] == "MODE":
self.modeChanges[lineNumber] = (tokens[1],int(tokens[2]))
# anything else must be the log data
elif not copterLogPre3Header:
groupName = tokens[0]
tokens2 = line.split(', ')
# first time seeing this type of log line, create the channel storage
if not groupName in self.channels:
self.channels[groupName] = {}
for label in self.formats[groupName].labels:
self.channels[groupName][label] = Channel()
#print "Channel definition for group %s, data at address %s" % (groupName, hex(id(self.channels[groupName][label].dictData)))
#pprint.pprint(self.channels[groupName]) # TEMP!!!
# check the number of tokens matches between the line and what we're expecting from the FMT definition
if len(tokens2) != (len(self.formats[groupName].labels) + 1):
errorMsg = "Error on line %d of log file: %s, %s line's value count (%d) not matching FMT specification (%d)" % (lineNumber, self.filename, groupName, len(tokens2), len(self.formats[groupName].labels))
if ignoreBadlines:
print errorMsg + " (skipping line)"
continue
else:
raise Exception(errorMsg)
# store each token in its relevant channel
for (index,label) in enumerate(self.formats[groupName].labels):
#value = float(tokens2[index+1]) # simple read without handling datatype
value = self.__castToFormatType(tokens2[index+1], self.formats[groupName].types[index]) # handling datatype via this call slows down ready by about 50%
channel = self.channels[groupName][label]
#print "Set data {%s,%s} for line %d to value %s, of type %c, stored at address %s" % (groupName, label, lineNumber, `value`, self.formats[groupName].types[index], hex(id(channel.dictData)))
channel.dictData[lineNumber] = value
channel.listData.append((lineNumber,value))
except:
print "Error parsing line %d of log file %s" % (lineNumber,self.filename)
raise
# gather some general stats about the log
self.lineCount = lineNumber
self.filesizeKB = os.path.getsize(self.filename) / 1024.0
if "GPS" in self.channels:
# the GPS time label changed at some point, need to handle both
timeLabel = "TimeMS"
if timeLabel not in self.channels["GPS"]:
timeLabel = "Time"
firstTimeGPS = self.channels["GPS"][timeLabel].listData[0][1]
lastTimeGPS = self.channels["GPS"][timeLabel].listData[-1][1]
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000

183
Tools/LogAnalyzer/LogAnalyzer.py

@ -0,0 +1,183 @@ @@ -0,0 +1,183 @@
#!/usr/bin/env python
#
# A module to analyze and identify any common problems which can be determined from log files
#
# Initial code by Andrew Chapman (chapman@skymount.com), 16th Jan 2014
#
# some logging oddities noticed while doing this, to be followed up on:
# - tradheli MOT labels Mot1,Mot2,Mot3,Mot4,GGain
# - Pixhawk doesn't output one of the FMT labels... forget which one
# - MAG offsets seem to be constant (only seen data on Pixhawk)
# - MAG offsets seem to be cast to int before being output? (param is -84.67, logged as -84)
# TODO - unify result statusMessage and extraOutput. simple tests set statusMessage, complex ones append to it with newlines
import DataflashLog
import pprint # temp
import imp
import glob
import inspect
import os, sys
import argparse
import datetime
import time
class TestResult:
'''all tests pass back a standardized result'''
class StatusType:
# NA means not applicable for this log (e.g. copter tests against a plane log), UNKNOWN means it is missing data required for the test
PASS, FAIL, WARN, UNKNOWN, NA = range(5)
status = None
statusMessage = ""
extraFeedback = ""
class Test:
'''base class to be inherited by each specific log test. Each test should be quite granular so we have lots of small tests with clear results'''
name = ""
result = None # will be an instance of TestResult after being run
execTime = None
enable = True
def run(self, logdata):
pass
class TestSuite:
'''registers test classes'''
tests = []
logfile = None
logdata = None
def __init__(self):
# dynamically load in Test subclasses from the 'tests' folder
dirName = os.path.dirname(os.path.abspath(__file__))
testScripts = glob.glob(dirName + '/tests/*.py')
testClasses = []
for script in testScripts:
m = imp.load_source("m",script)
for name, obj in inspect.getmembers(m, inspect.isclass):
if name not in testClasses and inspect.getsourcefile(obj) == script:
testClasses.append(name)
self.tests.append(obj())
# and here's an example of explicitly loading a Test class if you wanted to do that
# m = imp.load_source("m", dirName + '/tests/TestBadParams.py')
# self.tests.append(m.TestBadParams())
def run(self, logdata):
'''run all registered tests in a single call'''
self.logdata = logdata
self.logfile = logdata.filename
for test in self.tests:
# run each test in turn, gathering timing info
if test.enable:
startTime = time.time()
test.run(self.logdata) # RUN THE TEST
endTime = time.time()
test.execTime = endTime-startTime
def outputPlainText(self, outputStats):
print 'Dataflash log analysis report for file: ' + self.logfile
print 'Log size: %.2fmb (%d lines)' % (self.logdata.filesizeKB / 1024.0, self.logdata.lineCount)
print 'Log duration: %s' % str(datetime.timedelta(seconds=self.logdata.durationSecs)) + '\n'
if self.logdata.vehicleType == "ArduCopter" and self.logdata.getCopterType():
print 'Vehicle Type: %s (%s)' % (self.logdata.vehicleType, self.logdata.getCopterType())
else:
print 'Vehicle Type: %s' % self.logdata.vehicleType
print 'Firmware Version: %s (%s)' % (self.logdata.firmwareVersion, self.logdata.firmwareHash)
print 'Hardware: %s' % self.logdata.hardwareType
print 'Free RAM: %s' % self.logdata.freeRAM
print '\n'
print "Test Results:"
for test in self.tests:
if not test.enable:
continue
execTime = ""
if outputStats:
execTime = " (%.2fms)" % (test.execTime)
if test.result.status == TestResult.StatusType.PASS:
print " %20s: PASS %-50s%s" % (test.name, test.result.statusMessage,execTime)
elif test.result.status == TestResult.StatusType.FAIL:
print " %20s: FAIL %-50s%s [GRAPH]" % (test.name, test.result.statusMessage,execTime)
elif test.result.status == TestResult.StatusType.WARN:
print " %20s: WARN %-50s%s [GRAPH]" % (test.name, test.result.statusMessage,execTime)
elif test.result.status == TestResult.StatusType.NA:
# skip any that aren't relevant for this vehicle/hardware/etc
continue
else:
print " %20s: UNKNOWN %-50s%s" % (test.name, test.result.statusMessage,execTime)
if test.result.extraFeedback:
for line in test.result.extraFeedback.strip().split('\n'):
print " %20s %s" % ("",line)
print '\n'
# temp - test some spot values
#print "GPS abs alt on line 24126 is " + `self.logdata.channels["GPS"]["Alt"].dictData[24126]` # 52.03
#print "ATT pitch on line 22153 is " + `self.logdata.channels["ATT"]["Pitch"].dictData[22153]` # -7.03
#gpsAlt = self.logdata.channels["GPS"]["Alt"]
#print "All GPS Alt data: %s\n\n" % gpsAlt.dictData
#gpsAltSeg = gpsAlt.getSegment(426,711)
#print "Segment of GPS Alt data from %d to %d: %s\n\n" % (426,711,gpsAltSeg.dictData)
def outputXML(self, xmlFile):
# TODO: implement XML output
# ...
raise Exception("outputXML() not implemented yet")
def main():
dirName = os.path.dirname(os.path.abspath(__file__))
# deal with command line arguments
parser = argparse.ArgumentParser(description='Analyze an APM Dataflash log for known issues')
parser.add_argument('logfile', type=argparse.FileType('r'), help='path to Dataflash log file')
parser.add_argument('-q', '--quiet', metavar='', action='store_const', const=True, help='quiet mode, do not print results')
parser.add_argument('-s', '--stats', metavar='', action='store_const', const=True, help='output performance stats')
parser.add_argument('-i', '--ignore', metavar='', action='store_const', const=True, help='ignore bad data lines')
parser.add_argument('-e', '--empty', metavar='', action='store_const', const=True, help='run an initial check for an empty log')
parser.add_argument('-x', '--xml', type=str, metavar='XML file', nargs='?', const='', default='', help='write output to specified XML file')
args = parser.parse_args()
# load the log
startTime = time.time()
logdata = DataflashLog.DataflashLog(args.logfile.name, ignoreBadlines=args.ignore) # read log
endTime = time.time()
if args.stats:
print "Log file read time: %.2f seconds" % (endTime-startTime)
# check for empty log if requested
if args.empty:
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
sys.stderr.write("Empty log file: %s, %s" % (logdata.filename, emptyErr))
sys.exit(1)
#run the tests, and gather timings
testSuite = TestSuite()
startTime = time.time()
testSuite.run(logdata) # run tests
endTime = time.time()
if args.stats:
print "Test suite run time: %.2f seconds" % (endTime-startTime)
# deal with output
if not args.quiet:
testSuite.outputPlainText(args.stats)
if args.xml:
testSuite.outputXML(args.xml)
if not args.quiet:
print "XML output written to file: %s\n" % args.xml
if __name__ == "__main__":
main()

7
Tools/LogAnalyzer/UnitTest.py

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
#
#
# Unit and regression tests for the LogAnalyzer code
#
#
# TODO: implement unit+regression tests

7777
Tools/LogAnalyzer/logs/mechanical_fail.log

File diff suppressed because it is too large Load Diff

2738
Tools/LogAnalyzer/logs/nan.log

File diff suppressed because it is too large Load Diff

4197
Tools/LogAnalyzer/logs/tradheli_brownout.log

File diff suppressed because it is too large Load Diff

9888
Tools/LogAnalyzer/logs/underpowered.log

File diff suppressed because it is too large Load Diff

18
Tools/LogAnalyzer/tests/TestBalanceTwist.py

@ -0,0 +1,18 @@ @@ -0,0 +1,18 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestBalanceTwist(Test):
'''test for badly unbalanced copter, including yaw twist'''
def __init__(self):
self.name = "Balance/Twist"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
if logdata.vehicleType == "ArduPlane":
self.result.status = TestResult.StatusType.NA
# TODO: implement copter test for unbalanced or twisted copter frame

34
Tools/LogAnalyzer/tests/TestBrownout.py

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import collections
class TestBrownout(Test):
'''test for a log that has been truncated in flight'''
def __init__(self):
self.name = "Brownout"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# step through the arm/disarm events in order, to see if they're symmetrical
# note: it seems landing detection isn't robust enough to rely upon here, so we'll only consider arm+disarm, not takeoff+land
evData = logdata.channels["EV"]["Id"]
orderedEVData = collections.OrderedDict(sorted(evData.dictData.items(), key=lambda t: t[0]))
isArmed = False
for line,ev in orderedEVData.iteritems():
if ev == 10:
isArmed = True
elif ev == 11:
isArmed = False
# check for relative altitude at end
if "CTUN" in logdata.channels and "BarAlt" in logdata.channels["CTUN"]:
finalAlt = logdata.channels["CTUN"]["BarAlt"].getNearestValue(logdata.lineCount, lookForwards=False)
finalAltMax = 3.0 # max alt offset that we'll still consider to be on the ground
if isArmed and finalAlt > finalAltMax:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Truncated Log? Ends while armed at altitude %.2fm" % finalAlt

54
Tools/LogAnalyzer/tests/TestCompass.py

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestCompass(Test):
'''test for compass offsets and throttle interference'''
def __init__(self):
self.name = "Compass"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# quick test that compass offset parameters are within recommended range (+/- 150)
maxOffset = 150
if logdata.hardwareType == "PX4":
maxOffset = 250 # Pixhawks have their offsets scaled larger
compassOfsX = logdata.parameters["COMPASS_OFS_X"]
compassOfsY = logdata.parameters["COMPASS_OFS_Y"]
compassOfsZ = logdata.parameters["COMPASS_OFS_Z"]
#print "MAG params: %.2f %.2f %.2f" % (compassOfsX,compassOfsY,compassOfsZ)
if abs(compassOfsX) > maxOffset or abs(compassOfsY) > maxOffset or abs(compassOfsZ) > maxOffset:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Large compass off params (X:%.2f, Y:%.2f, Z:%.2f)" % (compassOfsX,compassOfsY,compassOfsZ)
# check for excessive compass offsets using MAG data if present (it can change during flight is compass learn is on)
if "MAG" in logdata.channels:
err = False
#print "MAG min/max xyz... %.2f %.2f %.2f %.2f %.2f %.2f " % (logdata.channels["MAG"]["OfsX"].min(), logdata.channels["MAG"]["OfsX"].max(), logdata.channels["MAG"]["OfsY"].min(), logdata.channels["MAG"]["OfsY"].min(), logdata.channels["MAG"]["OfsZ"].min(), logdata.channels["MAG"]["OfsZ"].max())
if logdata.channels["MAG"]["OfsX"].max() > maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "X: %.2f\n" % logdata.channels["MAG"]["OfsX"].max()
err = True
if logdata.channels["MAG"]["OfsX"].min() < -maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "X: %.2f\n" % logdata.channels["MAG"]["OfsX"].min()
err = True
if logdata.channels["MAG"]["OfsY"].max() > maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "Y: %.2f\n" % logdata.channels["MAG"]["OfsY"].max()
err = True
if logdata.channels["MAG"]["OfsY"].min() < -maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "Y: %.2f\n" % logdata.channels["MAG"]["OfsY"].min()
err = True
if logdata.channels["MAG"]["OfsZ"].max() > maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "Z: %.2f\n" % logdata.channels["MAG"]["OfsZ"].max()
err = True
if logdata.channels["MAG"]["OfsZ"].min() < -maxOffset:
self.result.extraFeedback = self.result.extraFeedback + "Z: %.2f\n" % logdata.channels["MAG"]["OfsZ"].min()
err = True
if err:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Large compass offset in MAG data"
# TODO: check for compass/throttle interference. Need to add mag_field to logging, or calc our own from MAG data if present
# TODO: can we derive a compassmot percentage from the params? Fail if >30%?

19
Tools/LogAnalyzer/tests/TestEmpty.py

@ -0,0 +1,19 @@ @@ -0,0 +1,19 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestEmpty(Test):
'''test for empty or near-empty logs'''
def __init__(self):
self.name = "Empty"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# all the logic for this test is in the helper function, as it can also be called up front as an early exit
emptyErr = DataflashLog.DataflashLogHelper.isLogEmpty(logdata)
if emptyErr:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Empty log? " + emptyErr

54
Tools/LogAnalyzer/tests/TestEvents.py

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestEvents(Test):
'''test for erroneous events and failsafes'''
def __init__(self):
self.name = "Event/Failsafe"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
errors = set()
if "ERR" in logdata.channels:
errLines = sorted(logdata.channels["ERR"]["Subsys"].dictData.keys())
for line in errLines:
subSys = logdata.channels["ERR"]["Subsys"].dictData[line]
eCode = logdata.channels["ERR"]["ECode"].dictData[line]
if subSys == 2 and (eCode == 1):
errors.add("PPM")
elif subSys == 3 and (eCode == 1 or eCode == 2):
errors.add("COMPASS")
elif subSys == 5 and (eCode == 1):
errors.add("FS_THR")
elif subSys == 6 and (eCode == 1):
errors.add("FS_BATT")
elif subSys == 7 and (eCode == 1):
errors.add("GPS")
elif subSys == 8 and (eCode == 1):
errors.add("GCS")
elif subSys == 9 and (eCode == 1 or eCode == 2):
errors.add("FENCE")
elif subSys == 10:
errors.add("FLT_MODE")
elif subSys == 11 and (eCode == 2):
errors.add("GPS_GLITCH")
elif subSys == 12 and (eCode == 1):
errors.add("CRASH")
if len(errors) > 0:
if len(errors) == 1 and "FENCE" in errors:
self.result.status = TestResult.StatusType.WARN
else:
self.result.status = TestResult.StatusType.FAIL
if len(errors) == 1:
self.result.statusMessage = "ERR found: "
else:
self.result.statusMessage = "ERRs found: "
for err in errors:
self.result.statusMessage = self.result.statusMessage + err + " "

32
Tools/LogAnalyzer/tests/TestGPSGlitch.py

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestGPSGlitch(Test):
'''test for bad GPS data (satellite count, hdop), and later for sudden repositioning beyond what the drone could do'''
def __init__(self):
self.name = "GPS"
def run(self, logdata):
self.result = TestResult()
# define and check different thresholds for WARN level and FAIL level
minSatsWARN = 6
minSatsFAIL = 5
maxHDopWARN = 3.0
maxHDopFAIL = 10.0
foundBadSatsWarn = logdata.channels["GPS"]["NSats"].min() < minSatsWARN
foundBadHDopWarn = logdata.channels["GPS"]["HDop"].max() > maxHDopWARN
foundBadSatsFail = logdata.channels["GPS"]["NSats"].min() < minSatsFAIL
foundBadHDopFail = logdata.channels["GPS"]["HDop"].max() > maxHDopFAIL
if foundBadSatsFail or foundBadHDopFail:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Min satellites: %s, Max HDop: %s" % (logdata.channels["GPS"]["NSats"].min(), logdata.channels["GPS"]["HDop"].max())
elif foundBadSatsWarn or foundBadHDopWarn:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Min satellites: %s, Max HDop: %s" % (logdata.channels["GPS"]["NSats"].min(), logdata.channels["GPS"]["HDop"].max())
else:
self.result.status = TestResult.StatusType.PASS
# TODO: also test for sudden repositioning beyond what the drone could reasonably achieve
# ...

52
Tools/LogAnalyzer/tests/TestParams.py

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import math # for isnan()
class TestParams(Test):
'''test for any obviously bad parameters in the config'''
def __init__(self):
self.name = "Parameters"
def __checkParamIsEqual(self, paramName, expectedValue, logdata):
value = logdata.parameters[paramName]
if value != expectedValue:
self.result.status = TestResult.StatusType.FAIL
self.result.extraFeedback = self.result.extraFeedback + "%s set to %s, expecting %s\n" % (paramName, `value`, `expectedValue`)
def __checkParamIsLessThan(self, paramName, maxValue, logdata):
value = logdata.parameters[paramName]
if value >= maxValue:
self.result.status = TestResult.StatusType.FAIL
self.result.extraFeedback = self.result.extraFeedback + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `maxValue`)
def __checkParamIsMoreThan(self, paramName, minValue, logdata):
value = logdata.parameters[paramName]
if value <= minValue:
self.result.status = TestResult.StatusType.FAIL
self.result.extraFeedback = self.result.extraFeedback + "%s set to %s, expecting less than %s\n" % (paramName, `value`, `minValue`)
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS # PASS by default, tests below will override it if they fail
# check all params for NaN
for name,value in logdata.parameters.iteritems():
if math.isnan(value):
self.result.status = TestResult.StatusType.FAIL
self.result.extraFeedback = self.result.extraFeedback + name + " is NaN\n"
# add parameter checks below using the helper functions, any failures will trigger a FAIL status and accumulate info in extraFeedback
# if more complex checking or correlations are required you can access parameter values directly using the logdata.parameters[paramName] dict
if logdata.vehicleType == "ArduCopter":
self.__checkParamIsEqual ("MAG_ENABLE", 1, logdata)
self.__checkParamIsLessThan("THR_MIN", 200, logdata)
self.__checkParamIsLessThan("THR_MID", 650, logdata)
self.__checkParamIsMoreThan("THR_MID", 300, logdata)
# TODO: add more parameter tests, these are just an example...
elif logdata.vehicleType == "ArduPlane":
# TODO: add parameter checks for plane...
pass

16
Tools/LogAnalyzer/tests/TestPerformance.py

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestPerformance(Test):
'''check performance monitoring messages (PM) for issues with slow loops, etc'''
def __init__(self):
self.name = "Performance"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# TODO: test for performance warning messages, slow loops, etc

18
Tools/LogAnalyzer/tests/TestPitchRollCoupling.py

@ -0,0 +1,18 @@ @@ -0,0 +1,18 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestPitchRollCoupling(Test):
'''test for divergence between input and output pitch/roll, i.e. mechanical failure or bad PID tuning'''
def __init__(self):
self.name = "Pitch/Roll"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
# TODO: implement pitch/roll input/output divergence testing -
# note: names changed from PitchIn to DesPitch at some point, check for both

18
Tools/LogAnalyzer/tests/TestUnderpowered.py

@ -0,0 +1,18 @@ @@ -0,0 +1,18 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
class TestUnderpowered(Test):
'''test for underpowered copter'''
def __init__(self):
self.name = "Underpowered"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
if logdata.vehicleType == "ArduPlane":
self.result.status = TestResult.StatusType.NA
# TODO: implement test for underpowered copter (use log Randy provided)

34
Tools/LogAnalyzer/tests/TestVCC.py

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import collections
class TestVCC(Test):
'''test for VCC within recommendations, or abrupt end to log in flight'''
def __init__(self):
self.name = "VCC"
def run(self, logdata):
self.result = TestResult()
self.result.status = TestResult.StatusType.PASS
if not "CURR" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No CURR log data found"
return
# just a naive min/max test for now
vccMin = logdata.channels["CURR"]["Vcc"].min()
vccMax = logdata.channels["CURR"]["Vcc"].max()
vccDiff = vccMax - vccMin;
vccMinThreshold = 4.6 * 1000;
vccMaxDiff = 0.3 * 1000;
if vccDiff > vccMaxDiff:
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "VCC min/max diff %s, should be <%s" % (vccDiff/1000.0, vccMaxDiff/1000.0)
elif vccMin < vccMinThreshold:
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "VCC below minimum of %s (%s)" % (`vccMinThreshold/1000.0`,`vccMin/1000.0`)

63
Tools/LogAnalyzer/tests/TestVibration.py

@ -0,0 +1,63 @@ @@ -0,0 +1,63 @@
from LogAnalyzer import Test,TestResult
import DataflashLog
import numpy
class TestVibration(Test):
'''test for accelerometer vibration (accX/accY/accZ) within recommendations'''
def __init__(self):
self.name = "Vibration"
def run(self, logdata):
self.result = TestResult()
# constants
gravity = -9.81
aimRangeWarnXY = 1.5
aimRangeFailXY = 3.0
aimRangeWarnZ = 2.0 # gravity +/- aim range
aimRangeFailZ = 5.0 # gravity +/- aim range
# TODO: in Pixhawk should we use IMU or IMU2?
if not "IMU" in logdata.channels:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No IMU log data found"
return
# find some stable LOITER data to analyze, at least 10 seconds
chunks = DataflashLog.DataflashLogHelper.findLoiterChunks(logdata, minLengthSeconds=10, noRCInputs=True)
if not chunks:
self.result.status = TestResult.StatusType.UNKNOWN
self.result.statusMessage = "No stable LOITER log data found"
return
# for now we'll just use the first (largest) chunk of LOITER data
# TODO: ignore the first couple of secs to avoid bad data during transition - or can we check more analytically that we're stable?
# TODO: accumulate all LOITER chunks over min size, or just use the largest one?
startLine = chunks[0][0]
endLine = chunks[0][1]
#print "TestVibration using LOITER chunk from lines %s to %s" % (`startLine`, `endLine`)
def getStdDevIMU(logdata, channelName, startLine,endLine):
loiterData = logdata.channels["IMU"][channelName].getSegment(startLine,endLine)
numpyData = numpy.array(loiterData.dictData.values())
return numpy.std(numpyData)
# use 2x standard deviations as the metric, so if 95% of samples lie within the aim range we're good
stdDevX = abs(2 * getStdDevIMU(logdata,"AccX",startLine,endLine))
stdDevY = abs(2 * getStdDevIMU(logdata,"AccY",startLine,endLine))
stdDevZ = abs(2 * getStdDevIMU(logdata,"AccZ",startLine,endLine))
if (stdDevX > aimRangeFailXY) or (stdDevY > aimRangeFailXY) or (stdDevZ > aimRangeFailZ):
self.result.status = TestResult.StatusType.FAIL
self.result.statusMessage = "Vibration too high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
elif (stdDevX > aimRangeWarnXY) or (stdDevY > aimRangeWarnXY) or (stdDevZ > aimRangeWarnZ):
self.result.status = TestResult.StatusType.WARN
self.result.statusMessage = "Vibration slightly high (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
else:
self.result.status = TestResult.StatusType.PASS
self.result.statusMessage = "Good vibration values (X:%.2fg, Y:%.2fg, Z:%.2fg)" % (stdDevX,stdDevY,stdDevZ)
Loading…
Cancel
Save