Browse Source

autotest: use assert_receive_message

Removes several block-forever calls
master
Peter Barker 2 years ago committed by Peter Barker
parent
commit
8a13fb0f6d
  1. 101
      Tools/autotest/common.py

101
Tools/autotest/common.py

@ -3111,12 +3111,9 @@ class AutoTest(ABC): @@ -3111,12 +3111,9 @@ class AutoTest(ABC):
1, # target component
log_id,
log_id)
log_entry = self.mav.recv_match(type='LOG_ENTRY',
blocking=True,
timeout=2)
log_entry = self.assert_receive_message('LOG_ENTRY', timeout=2, verbose=True)
if log_entry.size != len(actual_bytes):
raise NotAchievedException("Incorrect bytecount")
self.progress("Using log entry (%s)" % str(log_entry))
if log_entry.id != log_id:
raise NotAchievedException("Incorrect log id received")
@ -5667,11 +5664,7 @@ class AutoTest(ABC): @@ -5667,11 +5664,7 @@ class AutoTest(ABC):
def get_altitude(self, relative=False, timeout=30):
'''returns vehicles altitude in metres, possibly relative-to-home'''
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT',
blocking=True,
timeout=timeout)
if msg is None:
raise MsgRcvTimeoutException("Failed to get Global Position")
msg = self.assert_receive_message("GLOBAL_POSITION_INT", timeout=timeout)
if relative:
return msg.relative_alt / 1000.0 # mm -> m
return msg.alt / 1000.0 # mm -> m
@ -5805,10 +5798,8 @@ class AutoTest(ABC): @@ -5805,10 +5798,8 @@ class AutoTest(ABC):
assert speed_min <= speed_max, "Minimum speed should be less than maximum speed."
def get_climbrate(timeout2):
msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2)
if msg:
return msg.climb
raise MsgRcvTimeoutException("Failed to get climb rate")
msg = self.assert_receive_message('VFR_HUD', timeout=timeout2)
return msg.climb
def validator(value2, target2=None):
if speed_min <= value2 <= speed_max:
@ -5837,10 +5828,8 @@ class AutoTest(ABC): @@ -5837,10 +5828,8 @@ class AutoTest(ABC):
assert speed_min <= speed_max, "Minimum speed should be less than maximum speed."
def get_speed(timeout2):
msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2)
if msg:
return getattr(msg, field)
raise MsgRcvTimeoutException("Failed to get %s" % field)
msg = self.assert_receive_message('VFR_HUD', timeout=timeout2)
return getattr(msg, field)
self.wait_and_maintain_range(
value_name=field,
@ -5855,13 +5844,11 @@ class AutoTest(ABC): @@ -5855,13 +5844,11 @@ class AutoTest(ABC):
def wait_roll(self, roll, accuracy, timeout=30, **kwargs):
"""Wait for a given roll in degrees."""
def get_roll(timeout2):
msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2)
if msg:
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Roll %d Pitch %d" % (r, p))
return r
raise MsgRcvTimeoutException("Failed to get Roll")
msg = self.assert_receive_message('ATTITUDE', timeout=timeout2)
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Roll %d Pitch %d" % (r, p))
return r
def validator(value2, target2):
return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy
@ -5879,13 +5866,11 @@ class AutoTest(ABC): @@ -5879,13 +5866,11 @@ class AutoTest(ABC):
def wait_pitch(self, pitch, accuracy, timeout=30, **kwargs):
"""Wait for a given pitch in degrees."""
def get_pitch(timeout2):
msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2)
if msg:
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Pitch %d Roll %d" % (p, r))
return p
raise MsgRcvTimeoutException("Failed to get Pitch")
msg = self.assert_receive_message('ATTITUDE', timeout=timeout2)
p = math.degrees(msg.pitch)
r = math.degrees(msg.roll)
self.progress("Pitch %d Roll %d" % (p, r))
return p
def validator(value2, target2):
return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy
@ -6118,10 +6103,8 @@ class AutoTest(ABC): @@ -6118,10 +6103,8 @@ class AutoTest(ABC):
def wait_yaw_speed(self, yaw_speed, accuracy=0.1, timeout=30, **kwargs):
"""Wait for a given yaw speed in radians per second."""
def get_yawspeed(timeout2):
msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2)
if msg:
return msg.yawspeed
raise MsgRcvTimeoutException("Failed to get yaw speed")
msg = self.assert_receive_message('ATTITUDE', timeout=timeout2)
return msg.yawspeed
def validator(value2, target2):
return math.fabs(value2 - target2) <= accuracy
@ -6177,12 +6160,8 @@ class AutoTest(ABC): @@ -6177,12 +6160,8 @@ class AutoTest(ABC):
)
def get_body_frame_velocity(self):
gri = self.mav.recv_match(type='GPS_RAW_INT', blocking=True, timeout=1)
if gri is None:
raise NotAchievedException("No GPS_RAW_INT")
att = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=1)
if att is None:
raise NotAchievedException("No ATTITUDE")
gri = self.assert_receive_message('GPS_RAW_INT', timeout=1)
att = self.assert_receive_message('ATTITUDE', timeout=1)
return mavextra.gps_velocity_body(gri, att)
def wait_speed_vector_bf(self, speed_vector, accuracy=0.2, timeout=30, **kwargs):
@ -6305,10 +6284,7 @@ class AutoTest(ABC): @@ -6305,10 +6284,7 @@ class AutoTest(ABC):
def distance_to_local_position(self, local_pos, timeout=30):
(x, y, z_down) = local_pos # alt is *up*
pos = self.mav.recv_match(
type='LOCAL_POSITION_NED',
blocking=True
)
pos = self.assert_receive_message('LOCAL_POSITION_NED', timeout=timeout)
delta_x = pos.x - x
delta_y = pos.y - y
@ -6531,7 +6507,7 @@ class AutoTest(ABC): @@ -6531,7 +6507,7 @@ class AutoTest(ABC):
seq = self.mav.waypoint_current()
m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT')
wp_dist = m.wp_dist
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
m = self.assert_receive_message('VFR_HUD')
# if we changed mode, fail
if self.mav.flightmode != mode:
@ -6677,17 +6653,12 @@ class AutoTest(ABC): @@ -6677,17 +6653,12 @@ class AutoTest(ABC):
def assert_fence_enabled(self, timeout=2):
# Check fence is enabled
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=timeout)
m = self.assert_receive_message('FENCE_STATUS', timeout=timeout)
self.progress("Got (%s)" % str(m))
if m is None:
raise NotAchievedException("Fence status was not received")
def assert_fence_disabled(self, timeout=2):
# Check fence is not enabled
m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=timeout)
self.progress("Got (%s)" % str(m))
if m is not None:
raise NotAchievedException("Fence status received unexpectedly")
self.assert_not_receiving_message('FENCE_STATUS', timeout=timeout)
def assert_prearm_failure(self,
expected_statustext,
@ -7702,8 +7673,7 @@ Also, ignores heartbeats not from our target system''' @@ -7702,8 +7673,7 @@ Also, ignores heartbeats not from our target system'''
m = self.mav.messages.get("HOME_POSITION", None)
if use_cached_home is False or m is None:
m = self.poll_home_position(quiet=True)
here = self.mav.recv_match(type='GLOBAL_POSITION_INT',
blocking=True)
here = self.assert_receive_message('GLOBAL_POSITION_INT')
return self.get_distance_int(m, here)
def home_position_as_mav_location(self):
@ -7726,7 +7696,7 @@ Also, ignores heartbeats not from our target system''' @@ -7726,7 +7696,7 @@ Also, ignores heartbeats not from our target system'''
while True:
if self.get_sim_time_cached() - tstart > timeout:
break
m = self.mav.recv_match(type='VFR_HUD', blocking=True)
m = self.assert_receive_message('VFR_HUD', timeout=timeout)
if m.groundspeed > want+tolerance:
raise NotAchievedException("Too fast (%f > %f)" %
(m.groundspeed, want))
@ -8125,7 +8095,7 @@ Also, ignores heartbeats not from our target system''' @@ -8125,7 +8095,7 @@ Also, ignores heartbeats not from our target system'''
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Cannot receive enough MAG_CAL_PROGRESS")
m = self.mav.recv_match(type=["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], blocking=True, timeout=10)
m = self.assert_receive_message(["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], timeout=10)
if m.get_type() == "MAG_CAL_REPORT":
if report_get[m.compass_id] == 0:
self.progress("Report: %s" % str(m))
@ -8162,7 +8132,7 @@ Also, ignores heartbeats not from our target system''' @@ -8162,7 +8132,7 @@ Also, ignores heartbeats not from our target system'''
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Cannot receive enough MAG_CAL_PROGRESS")
m = self.mav.recv_match(type=["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], blocking=True, timeout=5)
m = self.assert_receive_message(["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], timeout=5)
if m.get_type() == "MAG_CAL_REPORT":
if report_get[m.compass_id] == 0:
self.progress("Report: %s" % self.dump_message_verbose(m))
@ -8505,10 +8475,7 @@ Also, ignores heartbeats not from our target system''' @@ -8505,10 +8475,7 @@ Also, ignores heartbeats not from our target system'''
# wait until we definitely know where we are:
self.poll_home_position(timeout=120)
ss = self.mav.recv_match(type='SIMSTATE', blocking=True, timeout=1)
if ss is None:
raise NotAchievedException("Did not get SIMSTATE")
self.progress("Got SIMSTATE (%s)" % str(ss))
ss = self.assert_receive_message('SIMSTATE', timeout=1, verbose=True)
self.run_cmd(mavutil.mavlink.MAV_CMD_FIXED_MAG_CAL_YAW,
math.degrees(ss.yaw), # param1
@ -9146,7 +9113,7 @@ Also, ignores heartbeats not from our target system''' @@ -9146,7 +9113,7 @@ Also, ignores heartbeats not from our target system'''
# make sure get_message_interval works:
self.send_get_message_interval(victim_message, mav=mav)
m = mav.recv_match(type='MESSAGE_INTERVAL', blocking=True)
m = self.assert_receive_message('MESSAGE_INTERVAL', timeout=30, mav=mav)
if in_rate == 0:
want = self.rate_to_interval_us(expected_rate)
@ -9158,7 +9125,7 @@ Also, ignores heartbeats not from our target system''' @@ -9158,7 +9125,7 @@ Also, ignores heartbeats not from our target system'''
if m.interval_us != want:
raise NotAchievedException("Did not read same interval back from autopilot: want=%d got=%d)" %
(want, m.interval_us))
m = mav.recv_match(type='COMMAND_ACK', blocking=True)
m = self.assert_receive_message('COMMAND_ACK', mav=mav)
if m.result != mavutil.mavlink.MAV_RESULT_ACCEPTED:
raise NotAchievedException("Expected ACCEPTED for reading message interval")
@ -9244,10 +9211,10 @@ Also, ignores heartbeats not from our target system''' @@ -9244,10 +9211,10 @@ Also, ignores heartbeats not from our target system'''
non_existant_id = 145
self.send_get_message_interval(non_existant_id)
m = self.mav.recv_match(type='MESSAGE_INTERVAL', blocking=True)
m = self.assert_receive_message('MESSAGE_INTERVAL')
if m.interval_us != 0:
raise NotAchievedException("Supposed to get 0 back for unsupported stream")
m = self.mav.recv_match(type='COMMAND_ACK', blocking=True)
m = self.assert_receive_message('COMMAND_ACK')
if m.result != mavutil.mavlink.MAV_RESULT_FAILED:
raise NotAchievedException("Getting rate of unsupported message is a failure")
@ -11959,7 +11926,7 @@ switch value''' @@ -11959,7 +11926,7 @@ switch value'''
a = ltm.a()
if a is None:
return
m = self.mav.recv_match(type='ATTITUDE', blocking=True)
m = self.assert_receive_message('ATTITUDE')
pitch = a.pitch()
print("pitch: %s" % str(pitch))

Loading…
Cancel
Save