Browse Source

autotest: add tests for LTM telemetry output

zr-v5.1
Peter Barker 5 years ago committed by Peter Barker
parent
commit
b0aceaea70
  1. 4
      Tools/autotest/arduplane.py
  2. 255
      Tools/autotest/common.py

4
Tools/autotest/arduplane.py

@ -1713,6 +1713,10 @@ class AutoTestPlane(AutoTest): @@ -1713,6 +1713,10 @@ class AutoTestPlane(AutoTest):
"Test FrSkyD serial output",
self.test_frsky_d),
("LTM",
"Test LTM serial output",
self.test_ltm),
("AdvancedFailsafe",
"Test Advanced Failsafe",
self.test_advanced_failsafe),

255
Tools/autotest/common.py

@ -216,6 +216,152 @@ class Telem(object): @@ -216,6 +216,152 @@ class Telem(object):
return
self.update_read()
class LTM(Telem):
def __init__(self, destination_address):
super(LTM, self).__init__(destination_address)
self.HEADER1 = 0x24
self.HEADER2 = 0x54
self.FRAME_G = 0x47
self.FRAME_A = 0x41
self.FRAME_S = 0x53
self.frame_lengths = {
self.FRAME_G: 18,
self.FRAME_A: 10,
self.FRAME_S: 11,
}
self.frame_lengths = {
self.FRAME_G: 18,
self.FRAME_A: 10,
self.FRAME_S: 11,
}
self.data_by_id = {}
self.frames = {}
def g(self):
return self.frames.get(self.FRAME_G, None)
def a(self):
return self.frames.get(self.FRAME_A, None)
def s(self):
return self.frames.get(self.FRAME_S, None)
def progress(self, message):
print("LTM: %s" % message)
def handle_data(self, dataid, value):
self.progress("%u=%u" % (dataid, value))
self.data_by_id[dataid] = value
def consume_frame(self):
frame_type = ord(self.buffer[2])
frame_length = self.frame_lengths[frame_type]
# check frame CRC
crc = 0
count = 0
for c in self.buffer[3:frame_length-1]:
old = crc
crc ^= ord(c)
count += 1
if crc != ord(self.buffer[frame_length-1]):
raise NotAchievedException("Invalid checksum on frame type %s" % str(chr(frame_type)))
# self.progress("Received valid %s frame" % str(chr(frame_type)))
class Frame(object):
def __init__(self, buffer):
self.buffer = buffer
def intn(self, offset, count):
ret = 0
for i in range(offset, offset+count):
print("byte: %02x" % ord(self.buffer[i]))
ret = ret | (ord(self.buffer[i]) << ((i-offset)*8))
return ret
def int32(self, offset):
t = struct.unpack("<i", self.buffer[offset:offset+4])
return t[0]
# return self.intn(offset, 4)
def int16(self, offset):
t = struct.unpack("<h", self.buffer[offset:offset+2])
return t[0]
# return self.intn(offset, 2)
class FrameG(Frame):
def __init__(self, buffer):
super(FrameG, self,).__init__(buffer)
def lat(self):
return self.int32(3)
def lon(self):
return self.int32(7)
def gndspeed(self):
return ord(self.buffer[11])
def alt(self):
return self.int32(12)
def sats(self):
s = ord(self.buffer[16])
return (s>>2)
def fix_type(self):
s = ord(self.buffer[16])
return s & 0b11
class FrameA(Frame):
def __init__(self, buffer):
super(FrameA, self,).__init__(buffer)
def pitch(self):
return self.int16(3)
def roll(self):
return self.int16(5)
def hdg(self):
return self.int16(7)
class FrameS(Frame):
def __init__(self, buffer):
super(FrameS, self,).__init__(buffer)
if frame_type == self.FRAME_G:
frame = FrameG(self.buffer[0:frame_length-1])
elif frame_type == self.FRAME_A:
frame = FrameA(self.buffer[0:frame_length-1])
elif frame_type == self.FRAME_S:
frame = FrameS(self.buffer[0:frame_length-1])
else:
raise NotAchievedException("Bad frame?!?!?!")
self.buffer = self.buffer[frame_length:]
self.frames[frame_type] = frame
def update_read(self):
self.buffer += self.do_read()
while len(self.buffer):
if len(self.buffer) == 0:
break
if sys.version_info.major >= 3:
b = self.buffer[0]
else:
b = ord(self.buffer[0])
if ord(self.buffer[0]) != self.HEADER1:
self.bad_chars += 1
self.buffer = self.buffer[1:]
continue
if ord(self.buffer[1]) != self.HEADER2:
self.bad_chars += 1
self.buffer = self.buffer[1:]
continue
if ord(self.buffer[2]) not in [self.FRAME_G, self.FRAME_A, self.FRAME_S]:
self.bad_chars += 1
self.buffer = self.buffer[1:]
continue
frame_len = self.frame_lengths[ord(self.buffer[2])]
if len(self.buffer) < frame_len:
continue
self.consume_frame()
def get_data(self, dataid):
try:
return self.data_by_id[dataid]
except KeyError as e:
pass
return None
class FRSky(Telem):
def __init__(self, destination_address):
super(FRSky, self).__init__(destination_address)
@ -4433,6 +4579,115 @@ switch value''' @@ -4433,6 +4579,115 @@ switch value'''
if alt == gpi_abs_alt:
break
def test_ltm_g(self, ltm):
g = ltm.g()
if g is None:
return
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
print("m: %s" % str(m))
print("g.lat=%s m.lat=%s" % (str(g.lat()), str(m.lat)))
if abs(m.lat - g.lat()) > 10:
return False
print("g.lon:%s m.lon:%s" % (str(g.lon()), str(m.lon)))
if abs(m.lon - g.lon()) > 10:
return False
print("gndspeed: %s" % str(g.gndspeed()))
if g.gndspeed() != 0:
# FIXME if we start the vehicle moving.... check against VFR_HUD?
return False
print("g.alt=%s m.alt=%s" % (str(g.alt()/100.0), str(m.relative_alt/1000.0)))
if abs(m.relative_alt/1000.0 - g.alt()/100.0) > 1:
return False
print("sats: %s" % str(g.sats()))
m = self.mav.recv_match(type='GPS_RAW_INT', blocking=True)
if m.satellites_visible != g.sats():
return False
constrained_fix_type = m.fix_type
if constrained_fix_type > 3:
constrained_fix_type = 3
print("fix_type: %s" % g.fix_type())
if constrained_fix_type != g.fix_type():
return False
return True
def test_ltm_a(self, ltm):
a = ltm.a()
if a is None:
return
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
pitch = a.pitch()
print("pitch: %s" % str(pitch))
if abs(math.degrees(m.pitch) - pitch) > 1:
return False
roll = a.roll()
print("roll: %s" % str(roll))
if abs(math.degrees(m.roll) - roll) > 1:
return False
hdg = a.hdg()
myaw = math.degrees(m.yaw)
myaw += 360
myaw %= 360
print("a.hdg=%s m.hdg=%s" % (str(hdg), str(myaw)))
if abs(myaw - hdg) > 1:
return False
return True
def test_ltm_s(self, ltm):
s = ltm.s()
if s is None:
return
# FIXME. Actually check the field values are correct :-)
return True
def test_ltm(self):
self.set_parameter("SERIAL5_PROTOCOL", 25) # serial5 is LTM output
self.customise_SITL_commandline([
"--uartF=tcp:6735" # serial5 spews to localhost:6735
])
ltm = LTM(("127.0.0.1", 6735))
self.wait_ready_to_arm()
self.drain_mav_unparsed()
m = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if m is None:
raise NotAchievedException("Did not receive GLOBAL_POSITION_INT")
gpi_abs_alt = int(m.alt / 1000) # mm -> m
wants = {
"g": self.test_ltm_g,
"a": self.test_ltm_a,
"s": self.test_ltm_s,
}
tstart = self.get_sim_time_cached()
while True:
self.progress("Still wanting (%s)" %
",".join([ ("%s" % x) for x in wants.keys()]))
if len(wants) == 0:
break
now = self.get_sim_time_cached()
if now - tstart > 10:
raise AutoTestTimeoutException("Failed to get ltm data")
ltm.update()
wants_copy = copy.copy(wants)
for want in wants_copy:
self.progress("Checking %s" % str(want))
if wants[want](ltm):
self.progress(" Fulfilled")
del wants[want]
def tests(self):
return [
("PIDTuning",

Loading…
Cancel
Save