You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
455 lines
16 KiB
455 lines
16 KiB
#!/usr/bin/env python |
|
''' |
|
parse a MAVLink protocol XML file and generate a python implementation |
|
|
|
Copyright Andrew Tridgell 2011 |
|
Released under GNU GPL version 3 or later |
|
''' |
|
|
|
import sys, textwrap, os |
|
import mavparse, mavtemplate |
|
|
|
t = mavtemplate.MAVTemplate() |
|
|
|
def generate_preamble(outf, msgs, args, xml): |
|
print("Generating preamble") |
|
t.write(outf, """ |
|
''' |
|
MAVLink protocol implementation (auto-generated by mavgen.py) |
|
|
|
Generated from: ${FILELIST} |
|
|
|
Note: this file has been auto-generated. DO NOT EDIT |
|
''' |
|
|
|
import struct, array, mavutil, time |
|
|
|
WIRE_PROTOCOL_VERSION = "${WIRE_PROTOCOL_VERSION}" |
|
|
|
class MAVLink_header(object): |
|
'''MAVLink message header''' |
|
def __init__(self, msgId, mlen=0, seq=0, srcSystem=0, srcComponent=0): |
|
self.mlen = mlen |
|
self.seq = seq |
|
self.srcSystem = srcSystem |
|
self.srcComponent = srcComponent |
|
self.msgId = msgId |
|
|
|
def pack(self): |
|
return struct.pack('BBBBBB', ${PROTOCOL_MARKER}, self.mlen, self.seq, |
|
self.srcSystem, self.srcComponent, self.msgId) |
|
|
|
class MAVLink_message(object): |
|
'''base MAVLink message class''' |
|
def __init__(self, msgId, name): |
|
self._header = MAVLink_header(msgId) |
|
self._payload = None |
|
self._msgbuf = None |
|
self._crc = None |
|
self._fieldnames = [] |
|
self._type = name |
|
|
|
def get_msgbuf(self): |
|
return self._msgbuf |
|
|
|
def get_header(self): |
|
return self._header |
|
|
|
def get_payload(self): |
|
return self._payload |
|
|
|
def get_crc(self): |
|
return self._crc |
|
|
|
def get_fieldnames(self): |
|
return self._fieldnames |
|
|
|
def get_type(self): |
|
return self._type |
|
|
|
def get_msgId(self): |
|
return self._header.msgId |
|
|
|
def get_srcSystem(self): |
|
return self._header.srcSystem |
|
|
|
def get_srcComponent(self): |
|
return self._header.srcComponent |
|
|
|
def get_seq(self): |
|
return self._header.seq |
|
|
|
def __str__(self): |
|
ret = '%s {' % self._type |
|
for a in self._fieldnames: |
|
v = getattr(self, a) |
|
ret += '%s : %s, ' % (a, v) |
|
ret = ret[0:-2] + '}' |
|
return ret |
|
|
|
def pack(self, mav, crc_extra, payload): |
|
self._payload = payload |
|
self._header = MAVLink_header(self._header.msgId, len(payload), mav.seq, |
|
mav.srcSystem, mav.srcComponent) |
|
self._msgbuf = self._header.pack() + payload |
|
crc = mavutil.x25crc(self._msgbuf[1:]) |
|
if ${crc_extra}: # using CRC extra |
|
crc.accumulate(chr(crc_extra)) |
|
self._crc = crc.crc |
|
self._msgbuf += struct.pack('<H', self._crc) |
|
return self._msgbuf |
|
|
|
""", {'FILELIST' : ",".join(args), |
|
'PROTOCOL_MARKER' : xml.protocol_marker, |
|
'crc_extra' : xml.crc_extra, |
|
'WIRE_PROTOCOL_VERSION' : xml.wire_protocol_version }) |
|
|
|
|
|
def generate_enums(outf, enums): |
|
print("Generating enums") |
|
outf.write("\n# enums\n") |
|
wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" # ") |
|
for e in enums: |
|
outf.write("\n# %s\n" % e.name) |
|
for entry in e.entry: |
|
outf.write("%s = %u # %s\n" % (entry.name, entry.value, wrapper.fill(entry.description))) |
|
|
|
def generate_message_ids(outf, msgs): |
|
print("Generating message IDs") |
|
outf.write("\n# message IDs\n") |
|
outf.write("MAVLINK_MSG_ID_BAD_DATA = -1\n") |
|
for m in msgs: |
|
outf.write("MAVLINK_MSG_ID_%s = %u\n" % (m.name.upper(), m.id)) |
|
|
|
def generate_classes(outf, msgs): |
|
print("Generating class definitions") |
|
wrapper = textwrap.TextWrapper(initial_indent=" ", subsequent_indent=" ") |
|
for m in msgs: |
|
outf.write(""" |
|
class MAVLink_%s_message(MAVLink_message): |
|
''' |
|
%s |
|
''' |
|
def __init__(self""" % (m.name.lower(), wrapper.fill(m.description.strip()))) |
|
if len(m.fields) != 0: |
|
outf.write(", " + ", ".join(m.fieldnames)) |
|
outf.write("):\n") |
|
outf.write(" MAVLink_message.__init__(self, MAVLINK_MSG_ID_%s, '%s')\n" % (m.name.upper(), m.name.upper())) |
|
if len(m.fieldnames) != 0: |
|
outf.write(" self._fieldnames = ['%s']\n" % "', '".join(m.fieldnames)) |
|
for f in m.fields: |
|
outf.write(" self.%s = %s\n" % (f.name, f.name)) |
|
outf.write(""" |
|
def pack(self, mav): |
|
return MAVLink_message.pack(self, mav, %u, struct.pack('%s'""" % (m.crc_extra, m.fmtstr)) |
|
if len(m.fields) != 0: |
|
outf.write(", self." + ", self.".join(m.ordered_fieldnames)) |
|
outf.write("))\n") |
|
|
|
|
|
def mavfmt(field): |
|
'''work out the struct format for a type''' |
|
map = { |
|
'float' : 'f', |
|
'double' : 'd', |
|
'char' : 'c', |
|
'int8_t' : 'b', |
|
'uint8_t' : 'B', |
|
'uint8_t_mavlink_version' : 'B', |
|
'int16_t' : 'h', |
|
'uint16_t' : 'H', |
|
'int32_t' : 'i', |
|
'uint32_t' : 'I', |
|
'int64_t' : 'q', |
|
'uint64_t' : 'Q', |
|
} |
|
|
|
if field.array_length: |
|
if field.type in ['char', 'int8_t', 'uint8_t']: |
|
return str(field.array_length)+'s' |
|
return str(field.array_length)+map[field.type] |
|
return map[field.type] |
|
|
|
def generate_mavlink_class(outf, msgs, xml): |
|
print("Generating MAVLink class") |
|
|
|
outf.write("\n\nmavlink_map = {\n"); |
|
for m in msgs: |
|
outf.write(" MAVLINK_MSG_ID_%s : ( '%s', MAVLink_%s_message, %s, %u ),\n" % ( |
|
m.name.upper(), m.fmtstr, m.name.lower(), m.order_map, m.crc_extra)) |
|
outf.write("}\n\n") |
|
|
|
t.write(outf, """ |
|
class MAVError(Exception): |
|
'''MAVLink error class''' |
|
def __init__(self, msg): |
|
Exception.__init__(self, msg) |
|
self.message = msg |
|
|
|
class MAVString(str): |
|
'''NUL terminated string''' |
|
def __init__(self, s): |
|
str.__init__(self) |
|
def __str__(self): |
|
i = self.find(chr(0)) |
|
if i == -1: |
|
return self[:] |
|
return self[0:i] |
|
|
|
class MAVLink_bad_data(MAVLink_message): |
|
''' |
|
a piece of bad data in a mavlink stream |
|
''' |
|
def __init__(self, data, reason): |
|
MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA') |
|
self._fieldnames = ['data', 'reason'] |
|
self.data = data |
|
self.reason = reason |
|
self._msgbuf = data |
|
|
|
class MAVLink(object): |
|
'''MAVLink protocol handling class''' |
|
def __init__(self, file, srcSystem=0, srcComponent=0): |
|
self.seq = 0 |
|
self.file = file |
|
self.srcSystem = srcSystem |
|
self.srcComponent = srcComponent |
|
self.callback = None |
|
self.callback_args = None |
|
self.callback_kwargs = None |
|
self.buf = array.array('B') |
|
self.expected_length = 6 |
|
self.have_prefix_error = False |
|
self.robust_parsing = False |
|
self.protocol_marker = ${protocol_marker} |
|
self.little_endian = ${little_endian} |
|
self.crc_extra = ${crc_extra} |
|
self.sort_fields = ${sort_fields} |
|
self.total_packets_sent = 0 |
|
self.total_bytes_sent = 0 |
|
self.total_packets_received = 0 |
|
self.total_bytes_received = 0 |
|
self.total_receive_errors = 0 |
|
self.startup_time = time.time() |
|
|
|
def set_callback(self, callback, *args, **kwargs): |
|
self.callback = callback |
|
self.callback_args = args |
|
self.callback_kwargs = kwargs |
|
|
|
def send(self, mavmsg): |
|
'''send a MAVLink message''' |
|
buf = mavmsg.pack(self) |
|
self.file.write(buf) |
|
self.seq = (self.seq + 1) % 255 |
|
self.total_packets_sent += 1 |
|
self.total_bytes_sent += len(buf) |
|
|
|
def bytes_needed(self): |
|
'''return number of bytes needed for next parsing stage''' |
|
ret = self.expected_length - len(self.buf) |
|
if ret <= 0: |
|
return 1 |
|
return ret |
|
|
|
def parse_char(self, c): |
|
'''input some data bytes, possibly returning a new message''' |
|
if isinstance(c, str): |
|
self.buf.fromstring(c) |
|
else: |
|
self.buf.extend(c) |
|
self.total_bytes_received += len(c) |
|
if len(self.buf) >= 1 and self.buf[0] != ${protocol_marker}: |
|
magic = self.buf[0] |
|
self.buf = self.buf[1:] |
|
if self.robust_parsing: |
|
m = MAVLink_bad_data(chr(magic), "Bad prefix") |
|
if self.callback: |
|
self.callback(m, *self.callback_args, **self.callback_kwargs) |
|
self.expected_length = 6 |
|
self.total_receive_errors += 1 |
|
return m |
|
if self.have_prefix_error: |
|
return None |
|
self.have_prefix_error = True |
|
self.total_receive_errors += 1 |
|
raise MAVError("invalid MAVLink prefix '%s'" % magic) |
|
self.have_prefix_error = False |
|
if len(self.buf) >= 2: |
|
(magic, self.expected_length) = struct.unpack('BB', self.buf[0:2]) |
|
self.expected_length += 8 |
|
if self.expected_length >= 8 and len(self.buf) >= self.expected_length: |
|
mbuf = self.buf[0:self.expected_length] |
|
self.buf = self.buf[self.expected_length:] |
|
self.expected_length = 6 |
|
if self.robust_parsing: |
|
try: |
|
m = self.decode(mbuf) |
|
self.total_packets_received += 1 |
|
except MAVError as reason: |
|
m = MAVLink_bad_data(mbuf, reason.message) |
|
self.total_receive_errors += 1 |
|
else: |
|
m = self.decode(mbuf) |
|
self.total_packets_received += 1 |
|
if self.callback: |
|
self.callback(m, *self.callback_args, **self.callback_kwargs) |
|
return m |
|
return None |
|
|
|
def parse_buffer(self, s): |
|
'''input some data bytes, possibly returning a list of new messages''' |
|
m = self.parse_char(s) |
|
if m is None: |
|
return None |
|
ret = [m] |
|
while True: |
|
m = self.parse_char("") |
|
if m is None: |
|
return ret |
|
ret.append(m) |
|
return ret |
|
|
|
def decode(self, msgbuf): |
|
'''decode a buffer as a MAVLink message''' |
|
# decode the header |
|
try: |
|
magic, mlen, seq, srcSystem, srcComponent, msgId = struct.unpack('cBBBBB', msgbuf[:6]) |
|
except struct.error as emsg: |
|
raise MAVError('Unable to unpack MAVLink header: %s' % emsg) |
|
if ord(magic) != ${protocol_marker}: |
|
raise MAVError("invalid MAVLink prefix '%s'" % magic) |
|
if mlen != len(msgbuf)-8: |
|
raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u' % (len(msgbuf)-8, mlen, msgId)) |
|
|
|
if not msgId in mavlink_map: |
|
raise MAVError('unknown MAVLink message ID %u' % msgId) |
|
|
|
# decode the payload |
|
(fmt, type, order_map, crc_extra) = mavlink_map[msgId] |
|
|
|
# decode the checksum |
|
try: |
|
crc, = struct.unpack('<H', msgbuf[-2:]) |
|
except struct.error as emsg: |
|
raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg) |
|
crc2 = mavutil.x25crc(msgbuf[1:-2]) |
|
if ${crc_extra}: # using CRC extra |
|
crc2.accumulate(chr(crc_extra)) |
|
if crc != crc2.crc: |
|
raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc)) |
|
|
|
try: |
|
t = struct.unpack(fmt, msgbuf[6:-2]) |
|
except struct.error as emsg: |
|
raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % ( |
|
type, fmt, len(msgbuf[6:-2]), emsg)) |
|
|
|
tlist = list(t) |
|
# handle sorted fields |
|
if ${sort_fields}: |
|
t = tlist[:] |
|
for i in range(0, len(tlist)): |
|
tlist[i] = t[order_map[i]] |
|
|
|
# terminate any strings |
|
for i in range(0, len(tlist)): |
|
if isinstance(tlist[i], str): |
|
tlist[i] = MAVString(tlist[i]) |
|
t = tuple(tlist) |
|
# construct the message object |
|
try: |
|
m = type(*t) |
|
except Exception as emsg: |
|
raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg)) |
|
m._msgbuf = msgbuf |
|
m._payload = msgbuf[6:-2] |
|
m._crc = crc |
|
m._header = MAVLink_header(msgId, mlen, seq, srcSystem, srcComponent) |
|
return m |
|
""", xml) |
|
|
|
def generate_methods(outf, msgs): |
|
print("Generating methods") |
|
|
|
def field_descriptions(fields): |
|
ret = "" |
|
for f in fields: |
|
ret += " %-18s : %s (%s)\n" % (f.name, f.description.strip(), f.type) |
|
return ret |
|
|
|
wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" ") |
|
|
|
for m in msgs: |
|
comment = "%s\n\n%s" % (wrapper.fill(m.description.strip()), field_descriptions(m.fields)) |
|
|
|
selffieldnames = 'self, ' |
|
for f in m.fields: |
|
if f.omit_arg: |
|
selffieldnames += '%s=%s, ' % (f.name, f.const_value) |
|
else: |
|
selffieldnames += '%s, ' % f.name |
|
selffieldnames = selffieldnames[:-2] |
|
|
|
sub = {'NAMELOWER' : m.name.lower(), |
|
'SELFFIELDNAMES' : selffieldnames, |
|
'COMMENT' : comment, |
|
'FIELDNAMES' : ", ".join(m.fieldnames)} |
|
|
|
t.write(outf, """ |
|
def ${NAMELOWER}_encode(${SELFFIELDNAMES}): |
|
''' |
|
${COMMENT} |
|
''' |
|
msg = MAVLink_${NAMELOWER}_message(${FIELDNAMES}) |
|
msg.pack(self) |
|
return msg |
|
|
|
""", sub) |
|
|
|
t.write(outf, """ |
|
def ${NAMELOWER}_send(${SELFFIELDNAMES}): |
|
''' |
|
${COMMENT} |
|
''' |
|
return self.send(self.${NAMELOWER}_encode(${FIELDNAMES})) |
|
|
|
""", sub) |
|
|
|
|
|
def generate(basename, xml): |
|
'''generate complete python implemenation''' |
|
if basename.endswith('.py'): |
|
filename = basename |
|
else: |
|
filename = basename + '.py' |
|
|
|
msgs = [] |
|
enums = [] |
|
filelist = [] |
|
for x in xml: |
|
msgs.extend(x.message) |
|
enums.extend(x.enum) |
|
filelist.append(os.path.basename(x.filename)) |
|
|
|
for m in msgs: |
|
if xml[0].little_endian: |
|
m.fmtstr = '<' |
|
else: |
|
m.fmtstr = '>' |
|
for f in m.ordered_fields: |
|
m.fmtstr += mavfmt(f) |
|
m.order_map = [ 0 ] * len(m.fieldnames) |
|
for i in range(0, len(m.fieldnames)): |
|
m.order_map[i] = m.ordered_fieldnames.index(m.fieldnames[i]) |
|
|
|
print("Generating %s" % filename) |
|
outf = open(filename, "w") |
|
generate_preamble(outf, msgs, filelist, xml[0]) |
|
generate_enums(outf, enums) |
|
generate_message_ids(outf, msgs) |
|
generate_classes(outf, msgs) |
|
generate_mavlink_class(outf, msgs, xml[0]) |
|
generate_methods(outf, msgs) |
|
outf.close() |
|
print("Generated %s OK" % filename)
|
|
|