diff --git a/Tools/autotest/common.py b/Tools/autotest/common.py index fc1c7f5ea6..9216df1875 100644 --- a/Tools/autotest/common.py +++ b/Tools/autotest/common.py @@ -5700,18 +5700,12 @@ class AutoTest(ABC): return getattr(msg, field) raise MsgRcvTimeoutException("Failed to get %s" % field) - def validator(value2, target2=None): - if speed_min <= value2 <= speed_max: - return True - else: - return False - - self.wait_and_maintain( + self.wait_and_maintain_range( value_name=field, - target=(speed_min+speed_max)/2, + minimum=speed_min, + maximum=speed_max, current_value_getter=lambda: get_speed(timeout), - accuracy=(speed_max - speed_min)/2, - validator=lambda value2, target2: validator(value2, target2), + validator=None, timeout=timeout, **kwargs ) @@ -5765,32 +5759,51 @@ class AutoTest(ABC): ) def wait_and_maintain(self, value_name, target, current_value_getter, validator=None, accuracy=2.0, timeout=30, **kwargs): + if type(target) is Vector3: + return self.wait_and_maintain_vector( + value_name, + target, + current_value_getter, + validator, + timeout=30, + **kwargs + ) + return self.wait_and_maintain_range( + value_name, + minimum=target - accuracy/2, + maximum=target + accuracy/2, + current_value_getter=current_value_getter, + validator=validator, + timeout=timeout, + print_diagnostics_as_target_not_range=True, + **kwargs + ) + + def wait_and_maintain_vector(self, + value_name, + target, + current_value_getter, + validator, + timeout=30, + **kwargs): tstart = self.get_sim_time() achieving_duration_start = None - if type(target) is Vector3: - sum_of_achieved_values = Vector3() - last_value = Vector3() - else: - sum_of_achieved_values = 0.0 - last_value = 0.0 + sum_of_achieved_values = Vector3() + last_value = Vector3() count_of_achieved_values = 0 called_function = kwargs.get("called_function", None) minimum_duration = kwargs.get("minimum_duration", 0) if minimum_duration >= timeout: raise ValueError("minimum_duration >= timeout") - if type(target) is Vector3: - self.progress("Waiting for %s=(%s) with accuracy %.02f" % (value_name, str(target), accuracy)) - else: - self.progress("Waiting for %s=%.02f with accuracy %.02f" % (value_name, target, accuracy)) + + self.progress("Waiting for %s=(%s)" % (value_name, str(target))) + last_print_time = 0 while self.get_sim_time_cached() < tstart + timeout: # if we failed to received message with the getter the sim time isn't updated # noqa last_value = current_value_getter() if called_function is not None: called_function(last_value, target) - if validator is not None: - is_value_valid = validator(last_value, target) - else: - is_value_valid = math.fabs(last_value - target) <= accuracy + is_value_valid = validator(last_value, target) if self.get_sim_time_cached() - last_print_time > 1: if is_value_valid: want_or_got = "got" @@ -5800,20 +5813,14 @@ class AutoTest(ABC): if achieving_duration_start is not None: so_far = self.get_sim_time_cached() - achieving_duration_start achieved_duration_bit = " (maintain=%.1f/%.1f)" % (so_far, minimum_duration) - - if type(target) is Vector3: self.progress( - "%s=(%s) (%s (%s) +- %f)%s" % + "%s=(%s) (%s (%s))%s" % (value_name, str(last_value), want_or_got, str(target), - accuracy, achieved_duration_bit) ) - else: - self.progress("%s=%0.2f (%s %f +- %f)%s" % - (value_name, last_value, want_or_got, target, accuracy, achieved_duration_bit)) last_print_time = self.get_sim_time_cached() if is_value_valid: sum_of_achieved_values += last_value @@ -5821,19 +5828,13 @@ class AutoTest(ABC): if achieving_duration_start is None: achieving_duration_start = self.get_sim_time_cached() if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration: - if type(target) is Vector3: - self.progress("Attained %s=%s" % ( - value_name, - str(sum_of_achieved_values * (1.0 / count_of_achieved_values)))) - else: - self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values)) + self.progress("Attained %s=%s" % ( + value_name, + str(sum_of_achieved_values * (1.0 / count_of_achieved_values)))) return True else: achieving_duration_start = None - if type(target) is Vector3: - sum_of_achieved_values.zero() - else: - sum_of_achieved_values = 0.0 + sum_of_achieved_values.zero() count_of_achieved_values = 0 raise AutoTestTimeoutException( "Failed to attain %s want %s, reached %s" % @@ -5841,6 +5842,100 @@ class AutoTest(ABC): str(target), str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value))) + def wait_and_maintain_range(self, + value_name, + minimum, + maximum, + current_value_getter, + validator=None, + timeout=30, + print_diagnostics_as_target_not_range=False, + **kwargs): + if print_diagnostics_as_target_not_range: + target = (minimum + maximum) / 2 + accuracy = (maximum - minimum) / 2 + tstart = self.get_sim_time() + achieving_duration_start = None + sum_of_achieved_values = 0.0 + last_value = 0.0 + count_of_achieved_values = 0 + called_function = kwargs.get("called_function", None) + minimum_duration = kwargs.get("minimum_duration", 0) + if minimum_duration >= timeout: + raise ValueError("minimum_duration >= timeout") + if print_diagnostics_as_target_not_range: + self.progress("Waiting for %s=%.02f with accuracy %.02f" % (value_name, target, accuracy)) + else: + self.progress("Waiting for %s between (%s) and (%s)" % (value_name, str(minimum), str(maximum))) + last_print_time = 0 + while self.get_sim_time_cached() < tstart + timeout: # if we failed to received message with the getter the sim time isn't updated # noqa + last_value = current_value_getter() + if called_function is not None: + called_function(last_value, target) + if validator is not None: + if print_diagnostics_as_target_not_range: + is_value_valid = validator(last_value, target) + else: + is_value_valid = validator(last_value, minimum, maximum) + else: + is_value_valid = (minimum <= last_value) and (last_value <= maximum) + if self.get_sim_time_cached() - last_print_time > 1: + if is_value_valid: + want_or_got = "got" + else: + want_or_got = "want" + achieved_duration_bit = "" + if achieving_duration_start is not None: + so_far = self.get_sim_time_cached() - achieving_duration_start + achieved_duration_bit = " (maintain=%.1f/%.1f)" % (so_far, minimum_duration) + + if print_diagnostics_as_target_not_range: + self.progress( + "%s=%0.2f (%s %f +- %f)%s" % + (value_name, + last_value, + want_or_got, + target, + accuracy, + achieved_duration_bit) + ) + else: + self.progress( + "%s=%0.2f (%s between %s and %s)%s" % + (value_name, + last_value, + want_or_got, + str(minimum), + str(maximum), + achieved_duration_bit) + ) + last_print_time = self.get_sim_time_cached() + if is_value_valid: + sum_of_achieved_values += last_value + count_of_achieved_values += 1.0 + if achieving_duration_start is None: + achieving_duration_start = self.get_sim_time_cached() + if self.get_sim_time_cached() - achieving_duration_start >= minimum_duration: + self.progress("Attained %s=%f" % (value_name, sum_of_achieved_values / count_of_achieved_values)) + return True + else: + achieving_duration_start = None + sum_of_achieved_values = 0.0 + count_of_achieved_values = 0 + if print_diagnostics_as_target_not_range: + raise AutoTestTimeoutException( + "Failed to attain %s want %s, reached %s" % + (value_name, + str(target), + str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value))) + else: + raise AutoTestTimeoutException( + "Failed to attain %s between %s and %s, reached %s" % + (value_name, + str(minimum), + str(maximum), + str(sum_of_achieved_values / count_of_achieved_values) if count_of_achieved_values != 0 else str(last_value))) + def wait_heading(self, heading, accuracy=5, timeout=30, **kwargs): """Wait for a given heading.""" def get_heading_wrapped(timeout2):