From 8a13fb0f6d5a78df6b7e856a476821014fbf243d Mon Sep 17 00:00:00 2001 From: Peter Barker Date: Mon, 22 Aug 2022 11:33:59 +1000 Subject: [PATCH] autotest: use assert_receive_message Removes several block-forever calls --- Tools/autotest/common.py | 101 +++++++++++++-------------------------- 1 file changed, 34 insertions(+), 67 deletions(-) diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index 02317c0a07..f81890ea36 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -3111,12 +3111,9 @@ class AutoTest(ABC): 1, # target component log_id, log_id) - log_entry = self.mav.recv_match(type='LOG_ENTRY', - blocking=True, - timeout=2) + log_entry = self.assert_receive_message('LOG_ENTRY', timeout=2, verbose=True) if log_entry.size != len(actual_bytes): raise NotAchievedException("Incorrect bytecount") - self.progress("Using log entry (%s)" % str(log_entry)) if log_entry.id != log_id: raise NotAchievedException("Incorrect log id received") @@ -5667,11 +5664,7 @@ class AutoTest(ABC): def get_altitude(self, relative=False, timeout=30): '''returns vehicles altitude in metres, possibly relative-to-home''' - msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', - blocking=True, - timeout=timeout) - if msg is None: - raise MsgRcvTimeoutException("Failed to get Global Position") + msg = self.assert_receive_message("GLOBAL_POSITION_INT", timeout=timeout) if relative: return msg.relative_alt / 1000.0 # mm -> m return msg.alt / 1000.0 # mm -> m @@ -5805,10 +5798,8 @@ class AutoTest(ABC): assert speed_min <= speed_max, "Minimum speed should be less than maximum speed." def get_climbrate(timeout2): - msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2) - if msg: - return msg.climb - raise MsgRcvTimeoutException("Failed to get climb rate") + msg = self.assert_receive_message('VFR_HUD', timeout=timeout2) + return msg.climb def validator(value2, target2=None): if speed_min <= value2 <= speed_max: @@ -5837,10 +5828,8 @@ class AutoTest(ABC): assert speed_min <= speed_max, "Minimum speed should be less than maximum speed." def get_speed(timeout2): - msg = self.mav.recv_match(type='VFR_HUD', blocking=True, timeout=timeout2) - if msg: - return getattr(msg, field) - raise MsgRcvTimeoutException("Failed to get %s" % field) + msg = self.assert_receive_message('VFR_HUD', timeout=timeout2) + return getattr(msg, field) self.wait_and_maintain_range( value_name=field, @@ -5855,13 +5844,11 @@ class AutoTest(ABC): def wait_roll(self, roll, accuracy, timeout=30, **kwargs): """Wait for a given roll in degrees.""" def get_roll(timeout2): - msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2) - if msg: - p = math.degrees(msg.pitch) - r = math.degrees(msg.roll) - self.progress("Roll %d Pitch %d" % (r, p)) - return r - raise MsgRcvTimeoutException("Failed to get Roll") + msg = self.assert_receive_message('ATTITUDE', timeout=timeout2) + p = math.degrees(msg.pitch) + r = math.degrees(msg.roll) + self.progress("Roll %d Pitch %d" % (r, p)) + return r def validator(value2, target2): return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy @@ -5879,13 +5866,11 @@ class AutoTest(ABC): def wait_pitch(self, pitch, accuracy, timeout=30, **kwargs): """Wait for a given pitch in degrees.""" def get_pitch(timeout2): - msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2) - if msg: - p = math.degrees(msg.pitch) - r = math.degrees(msg.roll) - self.progress("Pitch %d Roll %d" % (p, r)) - return p - raise MsgRcvTimeoutException("Failed to get Pitch") + msg = self.assert_receive_message('ATTITUDE', timeout=timeout2) + p = math.degrees(msg.pitch) + r = math.degrees(msg.roll) + self.progress("Pitch %d Roll %d" % (p, r)) + return p def validator(value2, target2): return math.fabs((value2 - target2 + 180) % 360 - 180) <= accuracy @@ -6118,10 +6103,8 @@ class AutoTest(ABC): def wait_yaw_speed(self, yaw_speed, accuracy=0.1, timeout=30, **kwargs): """Wait for a given yaw speed in radians per second.""" def get_yawspeed(timeout2): - msg = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=timeout2) - if msg: - return msg.yawspeed - raise MsgRcvTimeoutException("Failed to get yaw speed") + msg = self.assert_receive_message('ATTITUDE', timeout=timeout2) + return msg.yawspeed def validator(value2, target2): return math.fabs(value2 - target2) <= accuracy @@ -6177,12 +6160,8 @@ class AutoTest(ABC): ) def get_body_frame_velocity(self): - gri = self.mav.recv_match(type='GPS_RAW_INT', blocking=True, timeout=1) - if gri is None: - raise NotAchievedException("No GPS_RAW_INT") - att = self.mav.recv_match(type='ATTITUDE', blocking=True, timeout=1) - if att is None: - raise NotAchievedException("No ATTITUDE") + gri = self.assert_receive_message('GPS_RAW_INT', timeout=1) + att = self.assert_receive_message('ATTITUDE', timeout=1) return mavextra.gps_velocity_body(gri, att) def wait_speed_vector_bf(self, speed_vector, accuracy=0.2, timeout=30, **kwargs): @@ -6305,10 +6284,7 @@ class AutoTest(ABC): def distance_to_local_position(self, local_pos, timeout=30): (x, y, z_down) = local_pos # alt is *up* - pos = self.mav.recv_match( - type='LOCAL_POSITION_NED', - blocking=True - ) + pos = self.assert_receive_message('LOCAL_POSITION_NED', timeout=timeout) delta_x = pos.x - x delta_y = pos.y - y @@ -6531,7 +6507,7 @@ class AutoTest(ABC): seq = self.mav.waypoint_current() m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT') wp_dist = m.wp_dist - m = self.mav.recv_match(type='VFR_HUD', blocking=True) + m = self.assert_receive_message('VFR_HUD') # if we changed mode, fail if self.mav.flightmode != mode: @@ -6677,17 +6653,12 @@ class AutoTest(ABC): def assert_fence_enabled(self, timeout=2): # Check fence is enabled - m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=timeout) + m = self.assert_receive_message('FENCE_STATUS', timeout=timeout) self.progress("Got (%s)" % str(m)) - if m is None: - raise NotAchievedException("Fence status was not received") def assert_fence_disabled(self, timeout=2): # Check fence is not enabled - m = self.mav.recv_match(type='FENCE_STATUS', blocking=True, timeout=timeout) - self.progress("Got (%s)" % str(m)) - if m is not None: - raise NotAchievedException("Fence status received unexpectedly") + self.assert_not_receiving_message('FENCE_STATUS', timeout=timeout) def assert_prearm_failure(self, expected_statustext, @@ -7702,8 +7673,7 @@ Also, ignores heartbeats not from our target system''' m = self.mav.messages.get("HOME_POSITION", None) if use_cached_home is False or m is None: m = self.poll_home_position(quiet=True) - here = self.mav.recv_match(type='GLOBAL_POSITION_INT', - blocking=True) + here = self.assert_receive_message('GLOBAL_POSITION_INT') return self.get_distance_int(m, here) def home_position_as_mav_location(self): @@ -7726,7 +7696,7 @@ Also, ignores heartbeats not from our target system''' while True: if self.get_sim_time_cached() - tstart > timeout: break - m = self.mav.recv_match(type='VFR_HUD', blocking=True) + m = self.assert_receive_message('VFR_HUD', timeout=timeout) if m.groundspeed > want+tolerance: raise NotAchievedException("Too fast (%f > %f)" % (m.groundspeed, want)) @@ -8125,7 +8095,7 @@ Also, ignores heartbeats not from our target system''' while True: if self.get_sim_time_cached() - tstart > timeout: raise NotAchievedException("Cannot receive enough MAG_CAL_PROGRESS") - m = self.mav.recv_match(type=["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], blocking=True, timeout=10) + m = self.assert_receive_message(["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], timeout=10) if m.get_type() == "MAG_CAL_REPORT": if report_get[m.compass_id] == 0: self.progress("Report: %s" % str(m)) @@ -8162,7 +8132,7 @@ Also, ignores heartbeats not from our target system''' while True: if self.get_sim_time_cached() - tstart > timeout: raise NotAchievedException("Cannot receive enough MAG_CAL_PROGRESS") - m = self.mav.recv_match(type=["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], blocking=True, timeout=5) + m = self.assert_receive_message(["MAG_CAL_PROGRESS", "MAG_CAL_REPORT"], timeout=5) if m.get_type() == "MAG_CAL_REPORT": if report_get[m.compass_id] == 0: self.progress("Report: %s" % self.dump_message_verbose(m)) @@ -8505,10 +8475,7 @@ Also, ignores heartbeats not from our target system''' # wait until we definitely know where we are: self.poll_home_position(timeout=120) - ss = self.mav.recv_match(type='SIMSTATE', blocking=True, timeout=1) - if ss is None: - raise NotAchievedException("Did not get SIMSTATE") - self.progress("Got SIMSTATE (%s)" % str(ss)) + ss = self.assert_receive_message('SIMSTATE', timeout=1, verbose=True) self.run_cmd(mavutil.mavlink.MAV_CMD_FIXED_MAG_CAL_YAW, math.degrees(ss.yaw), # param1 @@ -9146,7 +9113,7 @@ Also, ignores heartbeats not from our target system''' # make sure get_message_interval works: self.send_get_message_interval(victim_message, mav=mav) - m = mav.recv_match(type='MESSAGE_INTERVAL', blocking=True) + m = self.assert_receive_message('MESSAGE_INTERVAL', timeout=30, mav=mav) if in_rate == 0: want = self.rate_to_interval_us(expected_rate) @@ -9158,7 +9125,7 @@ Also, ignores heartbeats not from our target system''' if m.interval_us != want: raise NotAchievedException("Did not read same interval back from autopilot: want=%d got=%d)" % (want, m.interval_us)) - m = mav.recv_match(type='COMMAND_ACK', blocking=True) + m = self.assert_receive_message('COMMAND_ACK', mav=mav) if m.result != mavutil.mavlink.MAV_RESULT_ACCEPTED: raise NotAchievedException("Expected ACCEPTED for reading message interval") @@ -9244,10 +9211,10 @@ Also, ignores heartbeats not from our target system''' non_existant_id = 145 self.send_get_message_interval(non_existant_id) - m = self.mav.recv_match(type='MESSAGE_INTERVAL', blocking=True) + m = self.assert_receive_message('MESSAGE_INTERVAL') if m.interval_us != 0: raise NotAchievedException("Supposed to get 0 back for unsupported stream") - m = self.mav.recv_match(type='COMMAND_ACK', blocking=True) + m = self.assert_receive_message('COMMAND_ACK') if m.result != mavutil.mavlink.MAV_RESULT_FAILED: raise NotAchievedException("Getting rate of unsupported message is a failure") @@ -11959,7 +11926,7 @@ switch value''' a = ltm.a() if a is None: return - m = self.mav.recv_match(type='ATTITUDE', blocking=True) + m = self.assert_receive_message('ATTITUDE') pitch = a.pitch() print("pitch: %s" % str(pitch))