You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
746 lines
31 KiB
746 lines
31 KiB
# |
|
# Code to abstract the parsing of APM Dataflash log files, currently only used by the LogAnalyzer |
|
# |
|
# Initial code by Andrew Chapman (amchapman@gmail.com), 16th Jan 2014 |
|
# |
|
|
|
from __future__ import print_function |
|
import collections |
|
import os |
|
import numpy |
|
import bisect |
|
import sys |
|
import ctypes |
|
|
|
from VehicleType import VehicleType, VehicleTypeString |
|
|
|
class Format(object): |
|
'''Data channel format as specified by the FMT lines in the log file''' |
|
def __init__(self,msgType,msgLen,name,types,labels): |
|
self.NAME = 'FMT' |
|
self.msgType = msgType |
|
self.msgLen = msgLen |
|
self.name = name |
|
self.types = types |
|
self.labels = labels.split(',') |
|
|
|
def __str__(self): |
|
return "%8s %s" % (self.name, repr(self.labels)) |
|
|
|
@staticmethod |
|
def trycastToFormatType(value,valueType): |
|
'''using format characters from libraries/DataFlash/DataFlash.h to cast strings to basic python int/float/string types |
|
tries a cast, if it does not work, well, acceptable as the text logs do not match the format, e.g. MODE is expected to be int''' |
|
try: |
|
if valueType in "fcCeELd": |
|
return float(value) |
|
elif valueType in "bBhHiIMQq": |
|
return int(value) |
|
elif valueType in "nNZ": |
|
return str(value) |
|
except: |
|
pass |
|
return value |
|
|
|
def to_class(self): |
|
members = dict( |
|
NAME = self.name, |
|
labels = self.labels[:], |
|
) |
|
|
|
fieldtypes = [i for i in self.types] |
|
fieldlabels = self.labels[:] |
|
|
|
# field access |
|
for (label, _type) in zip(fieldlabels, fieldtypes): |
|
def createproperty(name, format): |
|
# extra scope for variable sanity |
|
# scaling via _NAME and def NAME(self): return self._NAME / SCALE |
|
propertyname = name |
|
attributename = '_' + name |
|
p = property(lambda x:getattr(x, attributename), |
|
lambda x, v:setattr(x,attributename, Format.trycastToFormatType(v,format))) |
|
members[propertyname] = p |
|
members[attributename] = None |
|
createproperty(label, _type) |
|
|
|
# repr shows all values but the header |
|
members['__repr__'] = lambda x: "<{cls} {data}>".format(cls=x.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(x,'_'+k)) for k in x.labels])) |
|
|
|
def init(a, *x): |
|
if len(x) != len(a.labels): |
|
raise ValueError("Invalid Length") |
|
#print(list(zip(a.labels, x))) |
|
for (l,v) in zip(a.labels, x): |
|
try: |
|
setattr(a, l, v) |
|
except Exception as e: |
|
print("{} {} {} failed".format(a,l,v)) |
|
print(e) |
|
|
|
members['__init__'] = init |
|
|
|
# finally, create the class |
|
cls = type(\ |
|
'Log__{:s}'.format(self.name), |
|
(object,), |
|
members |
|
) |
|
#print(members) |
|
return cls |
|
|
|
|
|
class logheader(ctypes.LittleEndianStructure): |
|
_fields_ = [ \ |
|
('head1', ctypes.c_uint8), |
|
('head2', ctypes.c_uint8), |
|
('msgid', ctypes.c_uint8), |
|
] |
|
def __repr__(self): |
|
return "<logheader head1=0x{self.head1:x} head2=0x{self.head2:x} msgid=0x{self.msgid:x} ({self.msgid})>".format(self=self) |
|
|
|
|
|
class BinaryFormat(ctypes.LittleEndianStructure): |
|
NAME = 'FMT' |
|
MSG = 128 |
|
SIZE = 0 |
|
FIELD_FORMAT = { |
|
'a': ctypes.c_int16 * 32, |
|
'b': ctypes.c_int8, |
|
'B': ctypes.c_uint8, |
|
'h': ctypes.c_int16, |
|
'H': ctypes.c_uint16, |
|
'i': ctypes.c_int32, |
|
'I': ctypes.c_uint32, |
|
'f': ctypes.c_float, |
|
'd': ctypes.c_double, |
|
'n': ctypes.c_char * 4, |
|
'N': ctypes.c_char * 16, |
|
'Z': ctypes.c_char * 64, |
|
'c': ctypes.c_int16,# * 100, |
|
'C': ctypes.c_uint16,# * 100, |
|
'e': ctypes.c_int32,# * 100, |
|
'E': ctypes.c_uint32,# * 100, |
|
'L': ctypes.c_int32, |
|
'M': ctypes.c_uint8, |
|
'q': ctypes.c_int64, |
|
'Q': ctypes.c_uint64, |
|
} |
|
|
|
FIELD_SCALE = { |
|
'c': 100, |
|
'C': 100, |
|
'e': 100, |
|
'E': 100, |
|
} |
|
|
|
_packed_ = True |
|
_fields_ = [ \ |
|
('head', logheader), |
|
('type', ctypes.c_uint8), |
|
('length', ctypes.c_uint8), |
|
('name', ctypes.c_char * 4), |
|
('types', ctypes.c_char * 16), |
|
('labels', ctypes.c_char * 64), |
|
] |
|
def __repr__(self): |
|
return "<{cls} {data}>".format(cls=self.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(self,k)) for (k,_) in self._fields_[1:]])) |
|
|
|
def to_class(self): |
|
members = dict( |
|
NAME = self.name, |
|
MSG = self.type, |
|
SIZE = self.length, |
|
labels = self.labels.split(",") if self.labels else [], |
|
_pack_ = True) |
|
|
|
fieldtypes = [i for i in self.types] |
|
fieldlabels = self.labels.split(",") |
|
if self.labels and (len(fieldtypes) != len(fieldlabels)): |
|
print("Broken FMT message for {} .. ignoring".format(self.name), file=sys.stderr) |
|
return None |
|
|
|
fields = [('head',logheader)] |
|
|
|
# field access |
|
for (label, _type) in zip(fieldlabels, fieldtypes): |
|
def createproperty(name, format): |
|
# extra scope for variable sanity |
|
# scaling via _NAME and def NAME(self): return self._NAME / SCALE |
|
propertyname = name |
|
attributename = '_' + name |
|
scale = BinaryFormat.FIELD_SCALE.get(format, None) |
|
p = property(lambda x:getattr(x, attributename)) |
|
if scale is not None: |
|
p = property(lambda x:getattr(x, attributename) / scale) |
|
members[propertyname] = p |
|
try: |
|
fields.append((attributename, BinaryFormat.FIELD_FORMAT[format])) |
|
except KeyError: |
|
print('ERROR: Failed to add FMT type: {}, with format: {}'.format(attributename, format)) |
|
raise |
|
createproperty(label, _type) |
|
members['_fields_'] = fields |
|
|
|
# repr shows all values but the header |
|
members['__repr__'] = lambda x: "<{cls} {data}>".format(cls=x.__class__.__name__, data = ' '.join(["{}:{}".format(k,getattr(x,k)) for k in x.labels])) |
|
|
|
# finally, create the class |
|
cls = type(\ |
|
'Log__{:s}'.format(self.name), |
|
(ctypes.LittleEndianStructure,), |
|
members |
|
) |
|
|
|
if ctypes.sizeof(cls) != cls.SIZE: |
|
print("size mismatch for {} expected {} got {}".format(cls, ctypes.sizeof(cls), cls.SIZE), file=sys.stderr) |
|
# for i in cls.labels: |
|
# print("{} = {}".format(i,getattr(cls,'_'+i))) |
|
return None |
|
|
|
return cls |
|
|
|
BinaryFormat.SIZE = ctypes.sizeof(BinaryFormat) |
|
|
|
class Channel(object): |
|
'''storage for a single stream of data, i.e. all GPS.RelAlt values''' |
|
|
|
# TODO: rethink data storage, but do more thorough regression testing before refactoring it |
|
# TODO: store data as a scipy spline curve so we can more easily interpolate and sample the slope? |
|
|
|
def __init__(self): |
|
self.dictData = {} # dict of linenum->value # store dupe data in dict and list for now, until we decide which is the better way to go |
|
self.listData = [] # list of (linenum,value) # store dupe data in dict and list for now, until we decide which is the better way to go |
|
def getSegment(self, startLine, endLine): |
|
'''returns a segment of this data (from startLine to endLine, inclusive) as a new Channel instance''' |
|
segment = Channel() |
|
segment.dictData = {k:v for k,v in self.dictData.iteritems() if k >= startLine and k <= endLine} |
|
return segment |
|
def min(self): |
|
return min(self.dictData.values()) |
|
def max(self): |
|
return max(self.dictData.values()) |
|
def avg(self): |
|
return numpy.mean(self.dictData.values()) |
|
def getNearestValueFwd(self, lineNumber): |
|
'''Returns (value,lineNumber)''' |
|
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) |
|
while index<len(self.listData): |
|
line = self.listData[index][0] |
|
#print("Looking forwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP |
|
if line >= lineNumber: |
|
return (self.listData[index][1],line) |
|
index += 1 |
|
raise Exception("Error finding nearest value for line %d" % lineNumber) |
|
def getNearestValueBack(self, lineNumber): |
|
'''Returns (value,lineNumber)''' |
|
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) - 1 |
|
while index>=0: |
|
line = self.listData[index][0] |
|
#print("Looking backwards for nearest value to line number %d, starting at line %d" % (lineNumber,line)) # TEMP |
|
if line <= lineNumber: |
|
return (self.listData[index][1],line) |
|
index -= 1 |
|
raise Exception("Error finding nearest value for line %d" % lineNumber) |
|
def getNearestValue(self, lineNumber, lookForwards=True): |
|
'''find the nearest data value to the given lineNumber, defaults to first looking forwards. Returns (value,lineNumber)''' |
|
if lookForwards: |
|
try: |
|
return self.getNearestValueFwd(lineNumber) |
|
except: |
|
return self.getNearestValueBack(lineNumber) |
|
else: |
|
try: |
|
return self.getNearestValueBack(lineNumber) |
|
except: |
|
return self.getNearestValueFwd(lineNumber) |
|
raise Exception("Error finding nearest value for line %d" % lineNumber) |
|
def getInterpolatedValue(self, lineNumber): |
|
(prevValue,prevValueLine) = self.getNearestValue(lineNumber, lookForwards=False) |
|
(nextValue,nextValueLine) = self.getNearestValue(lineNumber, lookForwards=True) |
|
if prevValueLine == nextValueLine: |
|
return prevValue |
|
weight = (lineNumber-prevValueLine) / float(nextValueLine-prevValueLine) |
|
return ((weight*prevValue) + ((1-weight)*nextValue)) |
|
def getIndexOf(self, lineNumber): |
|
'''returns the index within this channel's listData of the given lineNumber, or raises an Exception if not found''' |
|
index = bisect.bisect_left(self.listData, (lineNumber,-99999)) |
|
#print("INDEX of line %d: %d" % (lineNumber,index)) |
|
#print("self.listData[index][0]: %d" % self.listData[index][0]) |
|
if (self.listData[index][0] == lineNumber): |
|
return index |
|
else: |
|
raise Exception("Error finding index for line %d" % lineNumber) |
|
|
|
class LogIterator: |
|
'''Smart iterator that can move through a log by line number and maintain an index into the nearest values of all data channels''' |
|
# TODO: LogIterator currently indexes the next available value rather than the nearest value, we should make it configurable between next/nearest |
|
|
|
class LogIteratorSubValue: |
|
'''syntactic sugar to allow access by LogIterator[lineLabel][dataLabel]''' |
|
logdata = None |
|
iterators = None |
|
lineLabel = None |
|
def __init__(self, logdata, iterators, lineLabel): |
|
self.logdata = logdata |
|
self.lineLabel = lineLabel |
|
self.iterators = iterators |
|
def __getitem__(self, dataLabel): |
|
index = self.iterators[self.lineLabel][0] |
|
return self.logdata.channels[self.lineLabel][dataLabel].listData[index][1] |
|
|
|
iterators = {} # lineLabel -> (listIndex,lineNumber) |
|
logdata = None |
|
currentLine = None |
|
|
|
def __init__(self, logdata, lineNumber=0): |
|
self.logdata = logdata |
|
self.currentLine = lineNumber |
|
for lineLabel in self.logdata.formats: |
|
if lineLabel in self.logdata.channels: |
|
self.iterators[lineLabel] = () |
|
self.jump(lineNumber) |
|
def __iter__(self): |
|
return self |
|
def __getitem__(self, lineLabel): |
|
return LogIterator.LogIteratorSubValue(self.logdata, self.iterators, lineLabel) |
|
def next(self): |
|
'''increment iterator to next log line''' |
|
self.currentLine += 1 |
|
if self.currentLine > self.logdata.lineCount: |
|
return self |
|
for lineLabel in self.iterators.keys(): |
|
# check if the currentLine has gone past our the line we're pointing to for this type of data |
|
dataLabel = self.logdata.formats[lineLabel].labels[0] |
|
(index, lineNumber) = self.iterators[lineLabel] |
|
# if so, and it is not the last entry in the log, then increment the indices for all dataLabels under that lineLabel |
|
if (self.currentLine > lineNumber) and (index < len(self.logdata.channels[lineLabel][dataLabel].listData)-1): |
|
index += 1 |
|
lineNumber = self.logdata.channels[lineLabel][dataLabel].listData[index][0] |
|
self.iterators[lineLabel] = (index,lineNumber) |
|
return self |
|
def jump(self, lineNumber): |
|
'''jump iterator to specified log line''' |
|
self.currentLine = lineNumber |
|
for lineLabel in self.iterators.keys(): |
|
dataLabel = self.logdata.formats[lineLabel].labels[0] |
|
(value,lineNumber) = self.logdata.channels[lineLabel][dataLabel].getNearestValue(self.currentLine) |
|
self.iterators[lineLabel] = (self.logdata.channels[lineLabel][dataLabel].getIndexOf(lineNumber), lineNumber) |
|
|
|
|
|
class DataflashLogHelper: |
|
'''helper functions for dealing with log data, put here to keep DataflashLog class as a simple parser and data store''' |
|
|
|
@staticmethod |
|
def getTimeAtLine(logdata, lineNumber): |
|
'''returns the nearest GPS timestamp in milliseconds after the given line number''' |
|
if not "GPS" in logdata.channels: |
|
raise Exception("no GPS log data found") |
|
# older logs use 'TIme', newer logs use 'TimeMS' |
|
# even newer logs use TimeUS |
|
timeLabel = None |
|
for possible in "TimeMS", "Time", "TimeUS": |
|
if possible in logdata.channels["GPS"]: |
|
timeLabel = possible |
|
break |
|
if timeLabel is None: |
|
raise Exception("Unable to get time label") |
|
while lineNumber <= logdata.lineCount: |
|
if lineNumber in logdata.channels["GPS"][timeLabel].dictData: |
|
return logdata.channels["GPS"][timeLabel].dictData[lineNumber] |
|
lineNumber = lineNumber + 1 |
|
|
|
sys.stderr.write("didn't find GPS data for " + str(lineNumber) + " - using maxtime\n") |
|
return logdata.channels["GPS"][timeLabel].max() |
|
|
|
@staticmethod |
|
def findLoiterChunks(logdata, minLengthSeconds=0, noRCInputs=True): |
|
'''returns a list of (to,from) pairs defining sections of the log which are in loiter mode. Ordered from longest to shortest in time. If noRCInputs == True it only returns chunks with no control inputs''' |
|
# TODO: implement noRCInputs handling when identifying stable loiter chunks, for now we're ignoring it |
|
|
|
def chunkSizeCompare(chunk1, chunk2): |
|
chunk1Len = chunk1[1]-chunk1[0] |
|
chunk2Len = chunk2[1]-chunk2[0] |
|
if chunk1Len == chunk2Len: |
|
return 0 |
|
elif chunk1Len > chunk2Len: |
|
return -1 |
|
else: |
|
return 1 |
|
|
|
od = collections.OrderedDict(sorted(logdata.modeChanges.items(), key=lambda t: t[0])) |
|
chunks = [] |
|
for i in range(len(od.keys())): |
|
if od.values()[i][0] == "LOITER": |
|
startLine = od.keys()[i] |
|
endLine = None |
|
if i == len(od.keys())-1: |
|
endLine = logdata.lineCount |
|
else: |
|
endLine = od.keys()[i+1]-1 |
|
chunkTimeSeconds = (DataflashLogHelper.getTimeAtLine(logdata,endLine)-DataflashLogHelper.getTimeAtLine(logdata,startLine)+1) / 1000.0 |
|
if chunkTimeSeconds > minLengthSeconds: |
|
chunks.append((startLine,endLine)) |
|
#print("LOITER chunk: %d to %d, %d lines" % (startLine,endLine,endLine-startLine+1)) |
|
#print(" (time %d to %d, %d seconds)" % (DataflashLogHelper.getTimeAtLine(logdata,startLine), DataflashLogHelper.getTimeAtLine(logdata,endLine), chunkTimeSeconds)) |
|
chunks.sort(chunkSizeCompare) |
|
return chunks |
|
|
|
@staticmethod |
|
def isLogEmpty(logdata): |
|
'''returns an human readable error string if the log is essentially empty, otherwise returns None''' |
|
# naive check for now, see if the throttle output was ever above 20% |
|
throttleThreshold = 20 |
|
if logdata.vehicleType == VehicleType.Copter: |
|
throttleThreshold = 200 # copter uses 0-1000, plane+rover use 0-100 |
|
if "CTUN" in logdata.channels: |
|
try: |
|
maxThrottle = logdata.channels["CTUN"]["ThrOut"].max() |
|
except KeyError as e: |
|
# ThrOut was shorted to ThO at some stage... |
|
maxThrottle = logdata.channels["CTUN"]["ThO"].max() |
|
# at roughly the same time ThO became a range from 0 to 1 |
|
throttleThreshold = 0.2 |
|
if maxThrottle < throttleThreshold: |
|
return "Throttle never above 20%" |
|
return None |
|
|
|
|
|
class DataflashLog(object): |
|
'''APM Dataflash log file reader and container class. Keep this simple, add more advanced or specific functions to DataflashLogHelper class''' |
|
|
|
knownHardwareTypes = ["APM", "PX4", "MPNG"] |
|
intTypes = "bBhHiIM" |
|
floatTypes = "fcCeEL" |
|
charTypes = "nNZ" |
|
|
|
def __init__(self, logfile=None, format="auto", ignoreBadlines=False): |
|
self.filename = None |
|
|
|
self.vehicleType = None # from VehicleType enumeration; value derived from header |
|
self.vehicleTypeString = None # set at same time has the enum value |
|
self.firmwareVersion = "" |
|
self.firmwareHash = "" |
|
self.freeRAM = 0 |
|
self.hardwareType = "" # APM 1, APM 2, PX4, MPNG, etc What is VRBrain? BeagleBone, etc? Needs more testing |
|
|
|
self.formats = {} # name -> Format |
|
self.parameters = {} # token -> value |
|
self.messages = {} # lineNum -> message |
|
self.modeChanges = {} # lineNum -> (mode,value) |
|
self.channels = {} # lineLabel -> {dataLabel:Channel} |
|
|
|
self.filesizeKB = 0 |
|
self.durationSecs = 0 |
|
self.lineCount = 0 |
|
self.skippedLines = 0 |
|
self.backpatch_these_modechanges = [] |
|
self.frame = None |
|
|
|
if logfile: |
|
self.read(logfile, format, ignoreBadlines) |
|
|
|
def getCopterType(self): |
|
'''returns quad/hex/octo/tradheli if this is a copter log''' |
|
if self.vehicleType != VehicleType.Copter: |
|
return None |
|
motLabels = [] |
|
if "MOT" in self.formats: # not listed in PX4 log header for some reason? |
|
motLabels = self.formats["MOT"].labels |
|
if "GGain" in motLabels: |
|
return "tradheli" |
|
elif len(motLabels) == 4: |
|
return "quad" |
|
elif len(motLabels) == 6: |
|
return "hex" |
|
elif len(motLabels) == 8: |
|
return "octo" |
|
else: |
|
return "" |
|
|
|
def num_motor_channels(self): |
|
motor_channels_for_frame = { |
|
"QUAD": 4, |
|
"HEXA": 6, |
|
"Y6": 6, |
|
"OCTA": 8, |
|
"OCTA_QUAD": 8, |
|
# "HELI": 1, |
|
# "HELI_DUAL": 2, |
|
"TRI": 3, |
|
"SINGLE": 1, |
|
"COAX": 2, |
|
"TAILSITTER": 1, |
|
"DODECA_HEXA" : 12, |
|
} |
|
return motor_channels_for_frame[self.frame] |
|
|
|
def read(self, logfile, format="auto", ignoreBadlines=False): |
|
'''returns on successful log read (including bad lines if ignoreBadlines==True), will throw an Exception otherwise''' |
|
# TODO: dataflash log parsing code is pretty hacky, should re-write more methodically |
|
self.filename = logfile |
|
if self.filename == '<stdin>': |
|
f = sys.stdin |
|
else: |
|
f = open(self.filename, 'r') |
|
|
|
if format == 'bin': |
|
head = '\xa3\x95\x80\x80' |
|
elif format == 'log': |
|
head = "" |
|
elif format == 'auto': |
|
if self.filename == '<stdin>': |
|
# assuming TXT format |
|
# raise ValueError("Invalid log format for stdin: {}".format(format)) |
|
head = "" |
|
else: |
|
head = f.read(4) |
|
f.seek(0) |
|
else: |
|
raise ValueError("Unknown log format for {}: {}".format(self.filename, format)) |
|
|
|
if head == '\xa3\x95\x80\x80': |
|
numBytes, lineNumber = self.read_binary(f, ignoreBadlines) |
|
pass |
|
else: |
|
numBytes, lineNumber = self.read_text(f, ignoreBadlines) |
|
|
|
# gather some general stats about the log |
|
self.lineCount = lineNumber |
|
self.filesizeKB = numBytes / 1024.0 |
|
# TODO: switch duration calculation to use TimeMS values rather than GPS timestemp |
|
if "GPS" in self.channels: |
|
# the GPS time label changed at some point, need to handle both |
|
timeLabel = None |
|
for i in 'TimeMS','TimeUS','Time': |
|
if i in self.channels["GPS"]: |
|
timeLabel = i |
|
break |
|
firstTimeGPS = int(self.channels["GPS"][timeLabel].listData[0][1]) |
|
lastTimeGPS = int(self.channels["GPS"][timeLabel].listData[-1][1]) |
|
if timeLabel == 'TimeUS': |
|
firstTimeGPS /= 1000 |
|
lastTimeGPS /= 1000 |
|
self.durationSecs = (lastTimeGPS-firstTimeGPS) / 1000 |
|
|
|
# TODO: calculate logging rate based on timestamps |
|
# ... |
|
|
|
msg_vehicle_to_vehicle_map = { |
|
"ArduCopter": VehicleType.Copter, |
|
"APM:Copter": VehicleType.Copter, |
|
"ArduPlane": VehicleType.Plane, |
|
"ArduRover": VehicleType.Rover |
|
} |
|
|
|
# takes the vehicle type supplied via "MSG" and sets vehicleType from |
|
# the VehicleType enumeration |
|
def set_vehicleType_from_MSG_vehicle(self, MSG_vehicle): |
|
ret = self.msg_vehicle_to_vehicle_map.get(MSG_vehicle, None) |
|
if ret is None: |
|
raise ValueError("Unknown vehicle type (%s)" % (MSG_vehicle)) |
|
self.vehicleType = ret |
|
self.vehicleTypeString = VehicleTypeString[ret] |
|
|
|
def handleModeChange(self, lineNumber, e): |
|
if self.vehicleType == VehicleType.Copter: |
|
modes = { |
|
0:'STABILIZE', |
|
1:'ACRO', |
|
2:'ALT_HOLD', |
|
3:'AUTO', |
|
4:'GUIDED', |
|
5:'LOITER', |
|
6:'RTL', |
|
7:'CIRCLE', |
|
9:'LAND', |
|
10:'OF_LOITER', |
|
11:'DRIFT', |
|
13:'SPORT', |
|
14:'FLIP', |
|
15:'AUTOTUNE', |
|
16:'POSHOLD', |
|
17:'BRAKE', |
|
18:'THROW', |
|
19:'AVOID_ADSB', |
|
20:'GUIDED_NOGPS', |
|
21:'SMART_RTL', |
|
} |
|
try: |
|
if hasattr(e, 'ThrCrs'): |
|
self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ThrCrs) |
|
else: |
|
# assume it has ModeNum: |
|
self.modeChanges[lineNumber] = (modes[int(e.Mode)], e.ModeNum) |
|
except ValueError as x: |
|
if hasattr(e, 'ThrCrs'): |
|
self.modeChanges[lineNumber] = (e.Mode, e.ThrCrs) |
|
else: |
|
# some .log files have the name spelt out by name |
|
# rather than number, contrary to the format |
|
# string. Attempt to map that back to a number: |
|
uppername = str(e.Mode).upper() |
|
for num in modes: |
|
if modes[num].upper() == uppername: |
|
self.modeChanges[lineNumber] = (uppername, num) |
|
return |
|
# assume it has ModeNum: |
|
print("Unknown mode=%u" % e.ModeNum) |
|
self.modeChanges[lineNumber] = (e.Mode, "mode=%u" % e.ModeNum) |
|
elif self.vehicleType in [VehicleType.Plane, VehicleType.Copter, VehicleType.Rover]: |
|
self.modeChanges[lineNumber] = (e.Mode, e.ModeNum) |
|
else: |
|
# if you've gotten to here the chances are we don't |
|
# know what vehicle you're flying... |
|
raise Exception("Unknown log type for MODE line vehicletype=({}) line=({})".format(self.vehicleTypeString, repr(e))) |
|
|
|
def backPatchModeChanges(self): |
|
for (lineNumber, e) in self.backpatch_these_modechanges: |
|
self.handleModeChange(lineNumber, e) |
|
|
|
def set_frame(self, frame): |
|
self.frame = frame |
|
|
|
def process(self, lineNumber, e): |
|
if e.NAME == 'FMT': |
|
cls = e.to_class() |
|
if cls is not None: # FMT messages can be broken ... |
|
if hasattr(e, 'type') and e.type not in self._formats: # binary log specific |
|
self._formats[e.type] = cls |
|
if cls.NAME not in self.formats: |
|
self.formats[cls.NAME] = cls |
|
elif e.NAME == "PARM": |
|
self.parameters[e.Name] = e.Value |
|
elif e.NAME == "MSG": |
|
tokens = e.Message.split(' ') |
|
if not self.frame: |
|
if "Frame" in tokens[0]: |
|
self.set_frame(tokens[1]) |
|
if not self.vehicleType: |
|
try: |
|
self.set_vehicleType_from_MSG_vehicle(tokens[0]); |
|
except ValueError: |
|
return |
|
self.backPatchModeChanges() |
|
self.firmwareVersion = tokens[1] |
|
if len(tokens) == 3: |
|
self.firmwareHash = tokens[2][1:-1] |
|
else: |
|
self.messages[lineNumber] = e.Message |
|
elif e.NAME == "MODE": |
|
if self.vehicleType is None: |
|
self.backpatch_these_modechanges.append( (lineNumber, e) ) |
|
else: |
|
self.handleModeChange(lineNumber, e) |
|
# anything else must be the log data |
|
else: |
|
groupName = e.NAME |
|
|
|
# first time seeing this type of log line, create the channel storage |
|
if not groupName in self.channels: |
|
self.channels[groupName] = {} |
|
for label in e.labels: |
|
self.channels[groupName][label] = Channel() |
|
|
|
# store each token in its relevant channel |
|
for label in e.labels: |
|
value = getattr(e, label) |
|
channel = self.channels[groupName][label] |
|
channel.dictData[lineNumber] = value |
|
channel.listData.append((lineNumber, value)) |
|
|
|
|
|
def read_text(self, f, ignoreBadlines): |
|
self.formats = {'FMT':Format} |
|
lineNumber = 0 |
|
numBytes = 0 |
|
knownHardwareTypes = ["APM", "PX4", "MPNG"] |
|
for line in f: |
|
lineNumber = lineNumber + 1 |
|
numBytes += len(line) + 1 |
|
try: |
|
#print("Reading line: %d" % lineNumber) |
|
line = line.strip('\n\r') |
|
tokens = line.split(', ') |
|
# first handle the log header lines |
|
if line == " Ready to drive." or line == " Ready to FLY.": |
|
continue |
|
if line == "----------------------------------------": # present in pre-3.0 logs |
|
raise Exception("Log file seems to be in the older format (prior to self-describing logs), which isn't supported") |
|
if len(tokens) == 1: |
|
tokens2 = line.split(' ') |
|
if line == "": |
|
pass |
|
elif len(tokens2) == 1 and tokens2[0].isdigit(): # log index |
|
pass |
|
elif len(tokens2) == 3 and tokens2[0] == "Free" and tokens2[1] == "RAM:": |
|
self.freeRAM = int(tokens2[2]) |
|
elif tokens2[0] in knownHardwareTypes: |
|
self.hardwareType = line # not sure if we can parse this more usefully, for now only need to report it back verbatim |
|
elif (len(tokens2) == 2 or len(tokens2) == 3) and tokens2[1][0].lower() == "v": # e.g. ArduCopter V3.1 (5c6503e2) |
|
try: |
|
self.set_vehicleType_from_MSG_vehicle(tokens2[0]) |
|
except ValueError: |
|
pass |
|
self.firmwareVersion = tokens2[1] |
|
if len(tokens2) == 3: |
|
self.firmwareHash = tokens2[2][1:-1] |
|
else: |
|
errorMsg = "Error parsing line %d of log file: %s" % (lineNumber, self.filename) |
|
if ignoreBadlines: |
|
print(errorMsg + " (skipping line)", file=sys.stderr) |
|
self.skippedLines += 1 |
|
else: |
|
raise Exception("") |
|
else: |
|
if not tokens[0] in self.formats: |
|
raise ValueError("Unknown Format {}".format(tokens[0])) |
|
e = self.formats[tokens[0]](*tokens[1:]) |
|
self.process(lineNumber, e) |
|
except Exception as e: |
|
print("BAD LINE: " + line, file=sys.stderr) |
|
if not ignoreBadlines: |
|
raise Exception("Error parsing line %d of log file %s - %s" % (lineNumber,self.filename,e.args[0])) |
|
return (numBytes,lineNumber) |
|
|
|
def read_binary(self, f, ignoreBadlines): |
|
lineNumber = 0 |
|
numBytes = 0 |
|
for e in self._read_binary(f, ignoreBadlines): |
|
lineNumber += 1 |
|
if e is None: |
|
continue |
|
numBytes += e.SIZE |
|
# print(e) |
|
self.process(lineNumber, e) |
|
return (numBytes,lineNumber) |
|
|
|
def _read_binary(self, f, ignoreBadlines): |
|
self._formats = {128:BinaryFormat} |
|
data = bytearray(f.read()) |
|
offset = 0 |
|
while len(data) > offset + ctypes.sizeof(logheader): |
|
h = logheader.from_buffer(data, offset) |
|
if not (h.head1 == 0xa3 and h.head2 == 0x95): |
|
if ignoreBadlines == False: |
|
raise ValueError(h) |
|
else: |
|
if h.head1 == 0xff and h.head2 == 0xff and h.msgid == 0xff: |
|
print("Assuming EOF due to dataflash block tail filled with \\xff... (offset={off})".format(off=offset), file=sys.stderr) |
|
break |
|
offset += 1 |
|
continue |
|
|
|
if h.msgid in self._formats: |
|
typ = self._formats[h.msgid] |
|
if len(data) <= offset + typ.SIZE: |
|
break |
|
try: |
|
e = typ.from_buffer(data, offset) |
|
except: |
|
print("data:{} offset:{} size:{} sizeof:{} sum:{}".format(len(data),offset,typ.SIZE,ctypes.sizeof(typ),offset+typ.SIZE)) |
|
raise |
|
offset += typ.SIZE |
|
else: |
|
raise ValueError(str(h) + "unknown type") |
|
yield e
|
|
|