You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
79 lines
2.6 KiB
79 lines
2.6 KiB
from __future__ import print_function |
|
|
|
from LogAnalyzer import Test,TestResult |
|
import DataflashLog |
|
|
|
|
|
class TestDupeLogData(Test): |
|
'''test for duplicated data in log, which has been happening on PX4/Pixhawk''' |
|
|
|
def __init__(self): |
|
Test.__init__(self) |
|
self.name = "Dupe Log Data" |
|
|
|
def __matchSample(self, sample, sampleStartIndex, logdata): |
|
'''return the line number where a match is found, otherwise return False''' |
|
|
|
# ignore if all data in sample is the same value |
|
nSame = 0 |
|
for s in sample: |
|
if s[1] == sample[0][1]: |
|
nSame += 1 |
|
if nSame == 20: |
|
return False |
|
|
|
# c |
|
data = logdata.channels["ATT"]["Pitch"].listData |
|
for i in range(sampleStartIndex, len(data)): |
|
#print("Checking against index %d" % i) |
|
if i == sampleStartIndex: |
|
continue # skip matching against ourselves |
|
j = 0 |
|
while j<20 and (i+j)<len(data) and data[i+j][1] == sample[j][1]: |
|
#print("### Match found, j=%d, data=%f, sample=%f, log data matched to sample at line %d" % (j,data[i+j][1],sample[j][1],data[i+j][0])) |
|
j += 1 |
|
if j == 20: # all samples match |
|
return data[i][0] |
|
|
|
return False |
|
|
|
def run(self, logdata, verbose): |
|
self.result = TestResult() |
|
self.result.status = TestResult.StatusType.GOOD |
|
|
|
# this could be made more flexible by not hard-coding to use ATT data, could make it dynamic based on whatever is available as long as it is highly variable |
|
if "ATT" not in logdata.channels: |
|
self.result.status = TestResult.StatusType.UNKNOWN |
|
self.result.statusMessage = "No ATT log data" |
|
return |
|
|
|
# pick 10 sample points within the range of ATT data we have |
|
sampleStartIndices = [] |
|
attStartIndex = 0 |
|
attEndIndex = len(logdata.channels["ATT"]["Pitch"].listData)-1 |
|
step = attEndIndex / 11 |
|
for i in range(step,attEndIndex-step,step): |
|
sampleStartIndices.append(i) |
|
#print("Dupe data sample point index %d at line %d" % (i, logdata.channels["ATT"]["Pitch"].listData[i][0])) |
|
|
|
# get 20 datapoints of pitch from each sample location and check for a match elsewhere |
|
sampleIndex = 0 |
|
for i in range(sampleStartIndices[0], len(logdata.channels["ATT"]["Pitch"].listData)): |
|
if i == sampleStartIndices[sampleIndex]: |
|
#print("Checking sample %d" % i) |
|
sample = logdata.channels["ATT"]["Pitch"].listData[i:i+20] |
|
matchedLine = self.__matchSample(sample, i, logdata) |
|
if matchedLine: |
|
#print("Data from line %d found duplicated at line %d" % (sample[0][0],matchedLine)) |
|
self.result.status = TestResult.StatusType.FAIL |
|
self.result.statusMessage = "Duplicate data chunks found in log (%d and %d)" % (sample[0][0],matchedLine) |
|
return |
|
sampleIndex += 1 |
|
if sampleIndex >= len(sampleStartIndices): |
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|