|
|
@ -5,9 +5,9 @@ import unittest |
|
|
|
import rospy |
|
|
|
import rospy |
|
|
|
import math |
|
|
|
import math |
|
|
|
from geometry_msgs.msg import PoseStamped |
|
|
|
from geometry_msgs.msg import PoseStamped |
|
|
|
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, State, \ |
|
|
|
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, ParamValue, State, \ |
|
|
|
WaypointList |
|
|
|
WaypointList |
|
|
|
from mavros_msgs.srv import CommandBool, ParamGet, SetMode, WaypointClear, \ |
|
|
|
from mavros_msgs.srv import CommandBool, ParamGet, ParamSet, SetMode, SetModeRequest, WaypointClear, \ |
|
|
|
WaypointPush |
|
|
|
WaypointPush |
|
|
|
from pymavlink import mavutil |
|
|
|
from pymavlink import mavutil |
|
|
|
from sensor_msgs.msg import NavSatFix, Imu |
|
|
|
from sensor_msgs.msg import NavSatFix, Imu |
|
|
@ -42,6 +42,7 @@ class MavrosTestCommon(unittest.TestCase): |
|
|
|
rospy.loginfo("waiting for ROS services") |
|
|
|
rospy.loginfo("waiting for ROS services") |
|
|
|
try: |
|
|
|
try: |
|
|
|
rospy.wait_for_service('mavros/param/get', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/param/get', service_timeout) |
|
|
|
|
|
|
|
rospy.wait_for_service('mavros/param/set', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/cmd/arming', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/cmd/arming', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/mission/push', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/mission/push', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/mission/clear', service_timeout) |
|
|
|
rospy.wait_for_service('mavros/mission/clear', service_timeout) |
|
|
@ -50,6 +51,7 @@ class MavrosTestCommon(unittest.TestCase): |
|
|
|
except rospy.ROSException: |
|
|
|
except rospy.ROSException: |
|
|
|
self.fail("failed to connect to services") |
|
|
|
self.fail("failed to connect to services") |
|
|
|
self.get_param_srv = rospy.ServiceProxy('mavros/param/get', ParamGet) |
|
|
|
self.get_param_srv = rospy.ServiceProxy('mavros/param/get', ParamGet) |
|
|
|
|
|
|
|
self.set_param_srv = rospy.ServiceProxy('mavros/param/set', ParamSet) |
|
|
|
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming', |
|
|
|
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming', |
|
|
|
CommandBool) |
|
|
|
CommandBool) |
|
|
|
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode) |
|
|
|
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode) |
|
|
@ -234,6 +236,36 @@ class MavrosTestCommon(unittest.TestCase): |
|
|
|
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}". |
|
|
|
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}". |
|
|
|
format(mode, old_mode, timeout))) |
|
|
|
format(mode, old_mode, timeout))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_param(self, param_id, param_value, timeout): |
|
|
|
|
|
|
|
"""param: PX4 param string, ParamValue, timeout(int): seconds""" |
|
|
|
|
|
|
|
if param_value.integer != 0: |
|
|
|
|
|
|
|
value = param_value.integer |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
value = param_value.real |
|
|
|
|
|
|
|
rospy.loginfo("setting PX4 parameter: {0} with value {1}". |
|
|
|
|
|
|
|
format(param_id, value)) |
|
|
|
|
|
|
|
loop_freq = 1 # Hz |
|
|
|
|
|
|
|
rate = rospy.Rate(loop_freq) |
|
|
|
|
|
|
|
param_set = False |
|
|
|
|
|
|
|
for i in xrange(timeout * loop_freq): |
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
res = self.set_param_srv(param_id, param_value) |
|
|
|
|
|
|
|
if res.success: |
|
|
|
|
|
|
|
rospy.loginfo("param {0} set to {1} | seconds: {2} of {3}". |
|
|
|
|
|
|
|
format(param_id, value, i / loop_freq, timeout)) |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
except rospy.ServiceException as e: |
|
|
|
|
|
|
|
rospy.logerr(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
rate.sleep() |
|
|
|
|
|
|
|
except rospy.ROSException as e: |
|
|
|
|
|
|
|
self.fail(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(res.success, ( |
|
|
|
|
|
|
|
"failed to set param | param_id: {0}, param_value: {1} | timeout(seconds): {2}". |
|
|
|
|
|
|
|
format(param_id, value, timeout))) |
|
|
|
|
|
|
|
|
|
|
|
def wait_for_topics(self, timeout): |
|
|
|
def wait_for_topics(self, timeout): |
|
|
|
"""wait for simulation to be ready, make sure we're getting topic info |
|
|
|
"""wait for simulation to be ready, make sure we're getting topic info |
|
|
|
from all topics by checking dictionary of flag values set in callbacks, |
|
|
|
from all topics by checking dictionary of flag values set in callbacks, |
|
|
|