Browse Source

improve mavros SITL tests (#8652)

-created a test base class to centralize redundant methods among the different tests
-added mission waypoint list topic listener (this also helps make sure the simulation is ready)
-check number of mission waypoints in FCU against mission
-increase time for mavros topics to be ready from 30 to 60 seconds
-reduce position check loop rates
-clean up logging
-support QGC plan for mission file format, see #8619
-vehicle is an arg for mission test launch file, working toward other airframes
-Jenkins: fix vtol vehicle arg value
-get MAV_TYPE param and use FW radius for pure fixed-wing mission position check
-remove unused vehicle arg from test in multiple tests launch, clearing runtime warning
sbg
Anthony Lamping 7 years ago committed by Daniel Agar
parent
commit
d375880c4b
  1. 10
      Jenkinsfile
  2. 0
      integrationtests/python_src/px4_it/mavros/__init__.py
  3. 149
      integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py
  4. 168
      integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py
  5. 375
      integrationtests/python_src/px4_it/mavros/mavros_test_common.py
  6. 403
      integrationtests/python_src/px4_it/mavros/mission_test.py
  7. 3
      test/mavros_posix_test_mission.test
  8. 12
      test/mavros_posix_tests_missions.test

10
Jenkinsfile vendored

@ -358,7 +358,7 @@ pipeline { @@ -358,7 +358,7 @@ pipeline {
sh 'git fetch --tags'
sh 'make posix_sitl_default'
sh 'make posix_sitl_default sitl_gazebo'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_new_1.txt vehicle:=vtol_standard'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_new_1.txt vehicle:=standard_vtol'
}
post {
success {
@ -391,7 +391,7 @@ pipeline { @@ -391,7 +391,7 @@ pipeline {
sh 'git fetch --tags'
sh 'make posix_sitl_default'
sh 'make posix_sitl_default sitl_gazebo'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_new_2.txt vehicle:=vtol_standard'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_new_2.txt vehicle:=standard_vtol'
}
post {
success {
@ -424,7 +424,7 @@ pipeline { @@ -424,7 +424,7 @@ pipeline {
sh 'git fetch --tags'
sh 'make posix_sitl_default'
sh 'make posix_sitl_default sitl_gazebo'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_1.txt vehicle:=vtol_standard'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_1.txt vehicle:=standard_vtol'
}
post {
success {
@ -457,7 +457,7 @@ pipeline { @@ -457,7 +457,7 @@ pipeline {
sh 'git fetch --tags'
sh 'make posix_sitl_default'
sh 'make posix_sitl_default sitl_gazebo'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_2.txt vehicle:=vtol_standard'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_2.txt vehicle:=standard_vtol'
}
post {
success {
@ -490,7 +490,7 @@ pipeline { @@ -490,7 +490,7 @@ pipeline {
sh 'git fetch --tags'
sh 'make posix_sitl_default'
sh 'make posix_sitl_default sitl_gazebo'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_3.txt vehicle:=vtol_standard'
sh './test/rostest_px4_run.sh mavros_posix_test_mission.test mission:=vtol_old_3.txt vehicle:=standard_vtol'
}
post {
success {

0
integrationtests/python_src/px4_it/mavros/__init__.py

149
integrationtests/python_src/px4_it/mavros/mavros_offboard_attctl_test.py

@ -41,17 +41,16 @@ from __future__ import division @@ -41,17 +41,16 @@ from __future__ import division
PKG = 'px4'
import unittest
import rospy
from geometry_msgs.msg import Quaternion, Vector3
from mavros_msgs.msg import AttitudeTarget, ExtendedState
from mavros_test_common import MavrosTestCommon
from std_msgs.msg import Header
from threading import Thread
from tf.transformations import quaternion_from_euler
from geometry_msgs.msg import PoseStamped, Quaternion, Vector3
from mavros_msgs.msg import AttitudeTarget, HomePosition, State
from mavros_msgs.srv import CommandBool, SetMode
from std_msgs.msg import Header
class MavrosOffboardAttctlTest(unittest.TestCase):
class MavrosOffboardAttctlTest(MavrosTestCommon):
"""
Tests flying in offboard control by sending attitude and thrust setpoints
via MAVROS.
@ -60,34 +59,12 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -60,34 +59,12 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
"""
def setUp(self):
self.local_position = PoseStamped()
self.state = State()
self.att = AttitudeTarget()
self.sub_topics_ready = {
key: False
for key in ['local_pos', 'home_pos', 'state']
}
super(MavrosOffboardAttctlTest, self).setUp()
# setup ROS topics and services
try:
rospy.wait_for_service('mavros/cmd/arming', 30)
rospy.wait_for_service('mavros/set_mode', 30)
except rospy.ROSException:
self.fail("failed to connect to mavros services")
self.att = AttitudeTarget()
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',
CommandBool)
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
PoseStamped,
self.local_position_callback)
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
HomePosition,
self.home_position_callback)
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
self.att_setpoint_pub = rospy.Publisher(
'mavros/setpoint_raw/attitude', AttitudeTarget, queue_size=10)
'mavros/setpoint_raw/attitude', AttitudeTarget, queue_size=1)
# send setpoints in seperate thread to better prevent failsafe
self.att_thread = Thread(target=self.send_att, args=())
@ -97,29 +74,6 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -97,29 +74,6 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
def tearDown(self):
pass
#
# Callback functions
#
def local_position_callback(self, data):
self.local_position = data
if not self.sub_topics_ready['local_pos']:
self.sub_topics_ready['local_pos'] = True
def home_position_callback(self, data):
# this topic publishing seems to be a better indicator that the sim
# is ready, it's not actually needed
self.home_pos_sub.unregister()
if not self.sub_topics_ready['home_pos']:
self.sub_topics_ready['home_pos'] = True
def state_callback(self, data):
self.state = data
if not self.sub_topics_ready['state']:
self.sub_topics_ready['state'] = True
#
# Helper methods
#
@ -141,81 +95,6 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -141,81 +95,6 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
except rospy.ROSInterruptException:
pass
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo(
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
format(mode, self.state.mode, i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(mode_set, (
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
format(mode, old_mode, timeout)))
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo(
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
format(arm, old_arm, i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(arm_set, (
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
format(arm, self.state.armed, timeout)))
def wait_for_topics(self, timeout):
"""wait for simulation to be ready, make sure we're getting topic info
from all topics by checking dictionary of flag values set in callbacks,
timeout(int): seconds"""
rospy.loginfo("waiting for simulation topics to be ready")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
simulation_ready = False
for i in xrange(timeout * loop_freq):
if all(value for value in self.sub_topics_ready.values()):
simulation_ready = True
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue(simulation_ready, (
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
#
# Test method
#
@ -226,12 +105,11 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -226,12 +105,11 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
boundary_y = 5
boundary_z = -5
# delay starting the mission
self.wait_for_topics(30)
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
rospy.loginfo("seting mission mode")
self.set_mode("OFFBOARD", 5)
rospy.loginfo("arming")
self.set_arm(True, 5)
rospy.loginfo("run mission")
@ -239,7 +117,7 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -239,7 +117,7 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
format(boundary_x, boundary_y, boundary_z))
# does it cross expected boundaries in 'timeout' seconds?
timeout = 12 # (int) seconds
loop_freq = 10 # Hz
loop_freq = 2 # Hz
rate = rospy.Rate(loop_freq)
crossed = False
for i in xrange(timeout * loop_freq):
@ -254,12 +132,11 @@ class MavrosOffboardAttctlTest(unittest.TestCase): @@ -254,12 +132,11 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
rate.sleep()
self.assertTrue(crossed, (
"took too long to cross boundaries | current position x: {0}, y: {1}, z: {2} | timeout(seconds): {3}".
"took too long to cross boundaries | current position x: {0:.2f}, y: {1:.2f}, z: {2:.2f} | timeout(seconds): {3}".
format(self.local_position.pose.position.x,
self.local_position.pose.position.y,
self.local_position.pose.position.z, timeout)))
rospy.loginfo("disarming")
self.set_arm(False, 5)

168
integrationtests/python_src/px4_it/mavros/mavros_offboard_posctl_test.py

@ -41,19 +41,18 @@ from __future__ import division @@ -41,19 +41,18 @@ from __future__ import division
PKG = 'px4'
import unittest
import rospy
import math
import numpy as np
from threading import Thread
from tf.transformations import quaternion_from_euler
from geometry_msgs.msg import PoseStamped, Quaternion
from mavros_msgs.msg import HomePosition, State
from mavros_msgs.srv import CommandBool, SetMode
from mavros_msgs.msg import ExtendedState
from mavros_test_common import MavrosTestCommon
from std_msgs.msg import Header
from threading import Thread
from tf.transformations import quaternion_from_euler
class MavrosOffboardPosctlTest(unittest.TestCase):
class MavrosOffboardPosctlTest(MavrosTestCommon):
"""
Tests flying a path in offboard control by sending position setpoints
via MAVROS.
@ -64,34 +63,13 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -64,34 +63,13 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
"""
def setUp(self):
self.local_position = PoseStamped()
self.state = State()
self.pos = PoseStamped()
self.sub_topics_ready = {
key: False
for key in ['local_pos', 'home_pos', 'state']
}
super(MavrosOffboardPosctlTest, self).setUp()
# setup ROS topics and services
try:
rospy.wait_for_service('mavros/cmd/arming', 30)
rospy.wait_for_service('mavros/set_mode', 30)
except rospy.ROSException:
self.fail("failed to connect to mavros services")
self.pos = PoseStamped()
self.radius = 1
self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming',
CommandBool)
self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode)
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
PoseStamped,
self.local_position_callback)
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
HomePosition,
self.home_position_callback)
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
self.pos_setpoint_pub = rospy.Publisher(
'mavros/setpoint_position/local', PoseStamped, queue_size=10)
'mavros/setpoint_position/local', PoseStamped, queue_size=1)
# send setpoints in seperate thread to better prevent failsafe
self.pos_thread = Thread(target=self.send_pos, args=())
@ -101,29 +79,6 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -101,29 +79,6 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
def tearDown(self):
pass
#
# Callback functions
#
def local_position_callback(self, data):
self.local_position = data
if not self.sub_topics_ready['local_pos']:
self.sub_topics_ready['local_pos'] = True
def home_position_callback(self, data):
# this topic publishing seems to be a better indicator that the sim
# is ready, it's not actually needed
self.home_pos_sub.unregister()
if not self.sub_topics_ready['home_pos']:
self.sub_topics_ready['home_pos'] = True
def state_callback(self, data):
self.state = data
if not self.sub_topics_ready['state']:
self.sub_topics_ready['state'] = True
#
# Helper methods
#
@ -140,85 +95,12 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -140,85 +95,12 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
except rospy.ROSInterruptException:
pass
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo(
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
format(mode, old_mode, i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(mode_set, (
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
format(mode, self.state.mode, timeout)))
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo(
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
format(arm, old_arm, i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(arm_set, (
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
format(arm, self.state.armed, timeout)))
def wait_for_topics(self, timeout):
"""wait for simulation to be ready, make sure we're getting topic info
from all topics by checking dictionary of flag values set in callbacks,
timeout(int): seconds"""
rospy.loginfo("waiting for simulation topics to be ready")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
simulation_ready = False
for i in xrange(timeout * loop_freq):
if all(value for value in self.sub_topics_ready.values()):
simulation_ready = True
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue(simulation_ready, (
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
def is_at_position(self, x, y, z, offset):
rospy.logdebug("current position | x:{0}, y:{1}, z:{2}".format(
self.local_position.pose.position.x, self.local_position.pose.
position.y, self.local_position.pose.position.z))
"""offset: meters"""
rospy.logdebug(
"current position | x:{0:.2f}, y:{1:.2f}, z:{2:.2f}".format(
self.local_position.pose.position.x, self.local_position.pose.
position.y, self.local_position.pose.position.z))
desired = np.array((x, y, z))
pos = np.array((self.local_position.pose.position.x,
@ -227,12 +109,16 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -227,12 +109,16 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
return np.linalg.norm(desired - pos) < offset
def reach_position(self, x, y, z, timeout):
"""timeout(int): seconds"""
# set a position setpoint
self.pos.pose.position.x = x
self.pos.pose.position.y = y
self.pos.pose.position.z = z
rospy.loginfo("attempting to reach position | x: {0}, y: {1}, z: {2}".
format(x, y, z))
rospy.loginfo(
"attempting to reach position | x: {0}, y: {1}, z: {2} | current position x: {3:.2f}, y: {4:.2f}, z: {5:.2f}".
format(x, y, z, self.local_position.pose.position.x,
self.local_position.pose.position.y,
self.local_position.pose.position.z))
# For demo purposes we will lock yaw/heading to north.
yaw_degrees = 0 # North
@ -241,13 +127,13 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -241,13 +127,13 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
self.pos.pose.orientation = Quaternion(*quaternion)
# does it reach the position in 'timeout' seconds?
loop_freq = 10 # Hz
loop_freq = 2 # Hz
rate = rospy.Rate(loop_freq)
reached = False
for i in xrange(timeout * loop_freq):
if self.is_at_position(self.pos.pose.position.x,
self.pos.pose.position.y,
self.pos.pose.position.z, 1):
self.pos.pose.position.z, self.radius):
rospy.loginfo("position reached | seconds: {0} of {1}".format(
i / loop_freq, timeout))
reached = True
@ -256,7 +142,7 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -256,7 +142,7 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
rate.sleep()
self.assertTrue(reached, (
"took too long to get to position | current position x: {0}, y: {1}, z: {2} | timeout(seconds): {3}".
"took too long to get to position | current position x: {0:.2f}, y: {1:.2f}, z: {2:.2f} | timeout(seconds): {3}".
format(self.local_position.pose.position.x,
self.local_position.pose.position.y,
self.local_position.pose.position.z, timeout)))
@ -267,12 +153,11 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -267,12 +153,11 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
def test_posctl(self):
"""Test offboard position control"""
# delay starting the mission
self.wait_for_topics(30)
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
rospy.loginfo("seting mission mode")
self.set_mode("OFFBOARD", 5)
rospy.loginfo("arming")
self.set_arm(True, 5)
rospy.loginfo("run mission")
@ -282,7 +167,6 @@ class MavrosOffboardPosctlTest(unittest.TestCase): @@ -282,7 +167,6 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
self.reach_position(positions[i][0], positions[i][1],
positions[i][2], 18)
rospy.loginfo("disarming")
self.set_arm(False, 5)

375
integrationtests/python_src/px4_it/mavros/mavros_test_common.py

@ -0,0 +1,375 @@ @@ -0,0 +1,375 @@
#!/usr/bin/env python2
from __future__ import division
import unittest
import rospy
import math
from geometry_msgs.msg import PoseStamped
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, State, \
WaypointList
from mavros_msgs.srv import CommandBool, ParamGet, SetMode, WaypointClear, \
WaypointPush
from sensor_msgs.msg import NavSatFix
class MavrosTestCommon(unittest.TestCase):
# dictionaries correspond to mavros ExtendedState msg
LAND_STATES = {
0: 'UNDEFINED',
1: 'ON_GROUND',
2: 'IN_AIR',
3: 'TAKEOFF',
4: 'LANDING'
}
VTOL_STATES = {
0: 'VTOL UNDEFINED',
1: 'VTOL MC-to-FW',
2: 'VTOL FW-to-MC',
3: 'VTOL MC',
4: 'VTOL FW'
}
def __init__(self, *args):
super(MavrosTestCommon, self).__init__(*args)
def setUp(self):
self.altitude = Altitude()
self.extended_state = ExtendedState()
self.global_position = NavSatFix()
self.home_position = HomePosition()
self.local_position = PoseStamped()
self.mission_wp = WaypointList()
self.state = State()
self.mav_type = None
self.sub_topics_ready = {
key: False
for key in [
'alt', 'ext_state', 'global_pos', 'home_pos', 'local_pos',
'mission_wp', 'state'
]
}
# ROS services
service_timeout = 30
rospy.loginfo("waiting for ROS services")
try:
rospy.wait_for_service('mavros/param/get', service_timeout)
rospy.wait_for_service('mavros/cmd/arming', service_timeout)
rospy.wait_for_service('mavros/mission/push', service_timeout)
rospy.wait_for_service('mavros/mission/clear', service_timeout)
rospy.wait_for_service('mavros/set_mode', service_timeout)
rospy.loginfo("ROS services are up")
except rospy.ROSException:
self.fail("failed to connect to services")
self.get_param_srv = rospy.ServiceProxy('mavros/param/get', ParamGet)
self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming',
CommandBool)
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
self.wp_clear_srv = rospy.ServiceProxy('mavros/mission/clear',
WaypointClear)
self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push',
WaypointPush)
# ROS subscribers
self.alt_sub = rospy.Subscriber('mavros/altitude', Altitude,
self.altitude_callback)
self.ext_state_sub = rospy.Subscriber('mavros/extended_state',
ExtendedState,
self.extended_state_callback)
self.global_pos_sub = rospy.Subscriber('mavros/global_position/global',
NavSatFix,
self.global_position_callback)
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
HomePosition,
self.home_position_callback)
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
PoseStamped,
self.local_position_callback)
self.mission_wp_sub = rospy.Subscriber(
'mavros/mission/waypoints', WaypointList, self.mission_wp_callback)
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
#
# Callback functions
#
def altitude_callback(self, data):
self.altitude = data
# amsl has been observed to be nan while other fields are valid
if not self.sub_topics_ready['alt'] and not math.isnan(data.amsl):
self.sub_topics_ready['alt'] = True
def extended_state_callback(self, data):
if self.extended_state.vtol_state != data.vtol_state:
rospy.loginfo("VTOL state changed from {0} to {1}".format(
self.VTOL_STATES.get(self.extended_state.vtol_state),
self.VTOL_STATES.get(data.vtol_state)))
if self.extended_state.landed_state != data.landed_state:
rospy.loginfo("landed state changed from {0} to {1}".format(
self.LAND_STATES.get(self.extended_state.landed_state),
self.LAND_STATES.get(data.landed_state)))
self.extended_state = data
if not self.sub_topics_ready['ext_state']:
self.sub_topics_ready['ext_state'] = True
def global_position_callback(self, data):
self.global_position = data
if not self.sub_topics_ready['global_pos']:
self.sub_topics_ready['global_pos'] = True
def home_position_callback(self, data):
self.home_position = data
if not self.sub_topics_ready['home_pos']:
self.sub_topics_ready['home_pos'] = True
def local_position_callback(self, data):
self.local_position = data
if not self.sub_topics_ready['local_pos']:
self.sub_topics_ready['local_pos'] = True
def mission_wp_callback(self, data):
if self.mission_wp.current_seq != data.current_seq:
rospy.loginfo("current mission waypoint sequence updated: {0}".
format(data.current_seq))
self.mission_wp = data
if not self.sub_topics_ready['mission_wp']:
self.sub_topics_ready['mission_wp'] = True
def state_callback(self, data):
if self.state.armed != data.armed:
rospy.loginfo("armed state changed from {0} to {1}".format(
self.state.armed, data.armed))
if self.state.mode != data.mode:
rospy.loginfo("mode changed from {0} to {1}".format(
self.state.mode, data.mode))
self.state = data
# mavros publishes a disconnected state message on init
if not self.sub_topics_ready['state'] and data.connected:
self.sub_topics_ready['state'] = True
#
# Helper methods
#
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
rospy.loginfo("setting FCU arm: {0}".format(arm))
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo(
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
format(arm, old_arm, i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(arm_set, (
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
format(arm, old_arm, timeout)))
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
rospy.loginfo("setting FCU mode: {0}".format(mode))
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo(
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
format(mode, old_mode, i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(mode_set, (
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
format(mode, old_mode, timeout)))
def wait_for_topics(self, timeout):
"""wait for simulation to be ready, make sure we're getting topic info
from all topics by checking dictionary of flag values set in callbacks,
timeout(int): seconds"""
rospy.loginfo("waiting for subscribed topics to be ready")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
simulation_ready = False
for i in xrange(timeout * loop_freq):
if all(value for value in self.sub_topics_ready.values()):
simulation_ready = True
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue(simulation_ready, (
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
def wait_on_landed_state(self, desired_landed_state, timeout, index):
rospy.loginfo(
"waiting for landed state | state: {0}, index: {1}".format(
self.LAND_STATES.get(desired_landed_state), index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
landed_state_confirmed = False
for i in xrange(timeout * loop_freq):
if self.extended_state.landed_state == desired_landed_state:
landed_state_confirmed = True
rospy.loginfo(
"landed state confirmed | state: {0}, index: {1} | seconds: {2} of {3}".
format(
self.LAND_STATES.get(desired_landed_state), index, i /
loop_freq, timeout))
break
rate.sleep()
self.assertTrue(landed_state_confirmed, (
"landed state not detected | desired: {0}, current: {1} | index: {2}, timeout(seconds): {3}".
format(
self.LAND_STATES.get(desired_landed_state),
self.LAND_STATES.get(self.extended_state.landed_state), index,
timeout)))
def wait_on_transition(self, transition, timeout, index):
"""Wait for VTOL transition, timeout(int): seconds"""
rospy.loginfo(
"waiting for VTOL transition | transition: {0}, index: {1}".format(
self.VTOL_STATES.get(transition), index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
transitioned = False
for i in xrange(timeout * loop_freq):
if transition == self.extended_state.vtol_state:
rospy.loginfo(
"transitioned | index: {0} | seconds: {1} of {2}".format(
index, i / loop_freq, timeout))
transitioned = True
break
rate.sleep()
self.assertTrue(transitioned, (
"transition not detected | index: {0} | timeout(seconds): {1}".
format(index, timeout)))
def clear_wps(self, timeout):
"""timeout(int): seconds"""
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
wps_cleared = False
for i in xrange(timeout * loop_freq):
if not self.mission_wp.waypoints:
wps_cleared = True
rospy.loginfo("clear waypoints success | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
else:
try:
res = self.wp_clear_srv()
if not res.success:
rospy.logerr("failed to send waypoint clear command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(wps_cleared, (
"failed to clear waypoints | timeout(seconds): {0}".format(timeout)
))
def send_wps(self, waypoints, timeout):
"""waypoints, timeout(int): seconds"""
rospy.loginfo("sending mission waypoints")
if self.mission_wp.waypoints:
rospy.loginfo("FCU already has mission waypoints")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
wps_sent = False
wps_verified = False
for i in xrange(timeout * loop_freq):
if not wps_sent:
try:
res = self.wp_push_srv(start_index=0, waypoints=waypoints)
wps_sent = res.success
if wps_sent:
rospy.loginfo("waypoints successfully transferred")
except rospy.ServiceException as e:
rospy.logerr(e)
else:
if len(waypoints) == len(self.mission_wp.waypoints):
rospy.loginfo("number of waypoints transferred: {0}".
format(len(waypoints)))
wps_verified = True
if wps_sent and wps_verified:
rospy.loginfo("send waypoints success | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue((
wps_sent and wps_verified
), "mission could not be transferred and verified | timeout(seconds): {0}".
format(timeout))
def wait_for_mav_type(self, timeout):
"""Wait for MAV_TYPE parameter, timeout(int): seconds"""
rospy.loginfo("waiting for MAV_TYPE")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
res = False
for i in xrange(timeout * loop_freq):
try:
res = self.get_param_srv('MAV_TYPE')
if res.success:
self.mav_type = res.value.integer
rospy.loginfo(
"MAV_TYPE received | value: {0} | seconds: {1} of {2}".
format(self.mav_type, i / loop_freq, timeout))
break
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(res.success, (
"MAV_TYPE param get failed | timeout(seconds): {0}".format(timeout)
))

403
integrationtests/python_src/px4_it/mavros/mission_test.py

@ -42,7 +42,6 @@ from __future__ import division @@ -42,7 +42,6 @@ from __future__ import division
PKG = 'px4'
import unittest
import rospy
import glob
import json
@ -52,12 +51,10 @@ import px4tools @@ -52,12 +51,10 @@ import px4tools
import sys
from mavros import mavlink
from mavros.mission import QGroundControlWP
from mavros_msgs.msg import ExtendedState, Mavlink, Waypoint
from mavros_test_common import MavrosTestCommon
from pymavlink import mavutil
from threading import Thread
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, Mavlink, \
State, Waypoint
from mavros_msgs.srv import CommandBool, SetMode, WaypointPush
from sensor_msgs.msg import NavSatFix
def get_last_log():
@ -71,90 +68,96 @@ def get_last_log(): @@ -71,90 +68,96 @@ def get_last_log():
return last_log
def read_new_mission(f):
def read_mission(mission_filename):
wps = []
with open(mission_filename, 'r') as f:
mission_filename_ext = os.path.splitext(mission_filename)[1]
if mission_filename_ext == '.plan':
for waypoint in read_plan_file(f):
wps.append(waypoint)
rospy.logdebug(waypoint)
elif mission_filename_ext == '.mission':
for waypoint in read_mission_file(f):
wps.append(waypoint)
rospy.logdebug(waypoint)
elif mission_filename_ext == '.txt':
mission = QGroundControlWP()
for waypoint in mission.read(f):
wps.append(waypoint)
rospy.logdebug(waypoint)
else:
raise IOError("unknown mission file extension",
mission_filename_ext)
# set first item to current
if wps[0]:
wps[0].is_current = True
return wps
def read_plan_file(f):
d = json.load(f)
current = True
for wp in d['items']:
yield Waypoint(
is_current=current,
frame=int(wp['frame']),
command=int(wp['command']),
param1=float(wp['param1']),
param2=float(wp['param2']),
param3=float(wp['param3']),
param4=float(wp['param4']),
x_lat=float(wp['coordinate'][0]),
y_long=float(wp['coordinate'][1]),
z_alt=float(wp['coordinate'][2]),
autocontinue=bool(wp['autoContinue']))
if current:
current = False
class MavrosMissionTest(unittest.TestCase):
if 'mission' in d:
d = d['mission']
if 'items' in d:
for wp in d['items']:
yield Waypoint(
is_current=False,
frame=int(wp['frame']),
command=int(wp['command']),
param1=float('nan'
if wp['params'][0] is None else wp['params'][0]),
param2=float('nan'
if wp['params'][1] is None else wp['params'][1]),
param3=float('nan'
if wp['params'][2] is None else wp['params'][2]),
param4=float('nan'
if wp['params'][3] is None else wp['params'][3]),
x_lat=float(wp['params'][4]),
y_long=float(wp['params'][5]),
z_alt=float(wp['params'][6]),
autocontinue=bool(wp['autoContinue']))
else:
raise IOError("no mission items")
def read_mission_file(f):
d = json.load(f)
if 'items' in d:
for wp in d['items']:
yield Waypoint(
is_current=False,
frame=int(wp['frame']),
command=int(wp['command']),
param1=float(wp['param1']),
param2=float(wp['param2']),
param3=float(wp['param3']),
param4=float(wp['param4']),
x_lat=float(wp['coordinate'][0]),
y_long=float(wp['coordinate'][1]),
z_alt=float(wp['coordinate'][2]),
autocontinue=bool(wp['autoContinue']))
else:
raise IOError("no mission items")
class MavrosMissionTest(MavrosTestCommon):
"""
Run a mission
"""
# dictionaries correspond to mavros ExtendedState msg
LAND_STATES = {
0: 'UNDEFINED',
1: 'ON_GROUND',
2: 'IN_AIR',
3: 'TAKEOFF',
4: 'LANDING'
}
VTOL_STATES = {
0: 'VTOL UNDEFINED',
1: 'VTOL MC->FW',
2: 'VTOL FW->MC',
3: 'VTOL MC',
4: 'VTOL FW'
}
def setUp(self):
self.rate = rospy.Rate(10) # 10hz
self.has_global_pos = False
self.global_position = NavSatFix()
self.extended_state = ExtendedState()
self.altitude = Altitude()
self.state = State()
super(self.__class__, self).setUp()
self.mc_rad = 5
self.fw_rad = 60
self.fw_alt_rad = 10
self.last_alt_d = None
self.last_pos_d = None
self.mission_name = ""
self.sub_topics_ready = {
key: False
for key in ['global_pos', 'home_pos', 'ext_state', 'alt', 'state']
}
# setup ROS topics and services
try:
rospy.wait_for_service('mavros/mission/push', 30)
rospy.wait_for_service('mavros/cmd/arming', 30)
rospy.wait_for_service('mavros/set_mode', 30)
except rospy.ROSException:
self.fail("failed to connect to mavros services")
self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push',
WaypointPush)
self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming',
CommandBool)
self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode)
self.global_pos_sub = rospy.Subscriber('mavros/global_position/global',
NavSatFix,
self.global_position_callback)
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
HomePosition,
self.home_position_callback)
self.ext_state_sub = rospy.Subscriber('mavros/extended_state',
ExtendedState,
self.extended_state_callback)
self.alt_sub = rospy.Subscriber('mavros/altitude', Altitude,
self.altitude_callback)
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
self.mavlink_pub = rospy.Publisher('mavlink/to', Mavlink, queue_size=1)
# need to simulate heartbeat to prevent datalink loss detection
@ -169,61 +172,6 @@ class MavrosMissionTest(unittest.TestCase): @@ -169,61 +172,6 @@ class MavrosMissionTest(unittest.TestCase):
def tearDown(self):
pass
#
# Callback functions
#
def global_position_callback(self, data):
self.global_position = data
if not self.sub_topics_ready['global_pos']:
self.sub_topics_ready['global_pos'] = True
def home_position_callback(self, data):
# this topic publishing seems to be a better indicator that the sim
# is ready, it's not actually needed
self.home_pos_sub.unregister()
if not self.sub_topics_ready['home_pos']:
self.sub_topics_ready['home_pos'] = True
def extended_state_callback(self, data):
if self.extended_state.vtol_state != data.vtol_state:
rospy.loginfo("VTOL state changed from {0} to {1}".format(
self.VTOL_STATES.get(self.extended_state.vtol_state),
self.VTOL_STATES.get(data.vtol_state)))
if self.extended_state.landed_state != data.landed_state:
rospy.loginfo("landed state changed from {0} to {1}".format(
self.LAND_STATES.get(self.extended_state.landed_state),
self.LAND_STATES.get(data.landed_state)))
self.extended_state = data
if not self.sub_topics_ready['ext_state']:
self.sub_topics_ready['ext_state'] = True
def state_callback(self, data):
if self.state.armed != data.armed:
rospy.loginfo("armed state changed from {0} to {1}".format(
self.state.armed, data.armed))
if self.state.mode != data.mode:
rospy.loginfo("mode changed from {0} to {1}".format(
self.state.mode, data.mode))
self.state = data
# mavros publishes a disconnected state message on init
if not self.sub_topics_ready['state'] and data.connected:
self.sub_topics_ready['state'] = True
def altitude_callback(self, data):
self.altitude = data
# amsl has been observed to be nan while other fields are valid
if not self.sub_topics_ready['alt'] and not math.isnan(data.amsl):
self.sub_topics_ready['alt'] = True
#
# Helper methods
#
@ -236,60 +184,6 @@ class MavrosMissionTest(unittest.TestCase): @@ -236,60 +184,6 @@ class MavrosMissionTest(unittest.TestCase):
except rospy.ROSInterruptException:
pass
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo(
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
format(mode, old_mode, i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(mode_set, (
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
format(mode, old_mode, timeout)))
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo(
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
format(arm, old_arm, i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
rate.sleep()
self.assertTrue(arm_set, (
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
format(arm, old_arm, timeout)))
def is_at_position(self, lat, lon, alt, xy_offset, z_offset):
"""alt(amsl), xy_offset, z_offset: meters"""
R = 6371000 # metres
@ -322,27 +216,27 @@ class MavrosMissionTest(unittest.TestCase): @@ -322,27 +216,27 @@ class MavrosMissionTest(unittest.TestCase):
self.last_pos_d = None
rospy.loginfo(
"trying to reach waypoint | lat: {0:13.9f}, lon: {1:13.9f}, alt: {2:6.2f}, index: {3}".
"trying to reach waypoint | lat: {0:.9f}, lon: {1:.9f}, alt: {2:.2f}, index: {3}".
format(lat, lon, alt, index))
# does it reach the position in 'timeout' seconds?
loop_freq = 10 # Hz
loop_freq = 2 # Hz
rate = rospy.Rate(loop_freq)
reached = False
for i in xrange(timeout * loop_freq):
# use MC radius by default
# FIXME: also check MAV_TYPE from system status, otherwise pure fixed-wing won't work
xy_radius = self.mc_rad
z_radius = self.mc_rad
# use FW radius if in FW or in transition
if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW
or self.extended_state.vtol_state ==
# use FW radius if VTOL in FW or transition or FW
if (self.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_FW or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_TRANSITION_TO_MC or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_TRANSITION_TO_FW):
xy_radius = self.fw_rad
z_radius = self.fw_alt_rad
else: # assume MC
xy_radius = self.mc_rad
z_radius = self.mc_rad
if self.is_at_position(lat, lon, alt, xy_radius, z_radius):
reached = True
@ -355,78 +249,9 @@ class MavrosMissionTest(unittest.TestCase): @@ -355,78 +249,9 @@ class MavrosMissionTest(unittest.TestCase):
rate.sleep()
self.assertTrue(reached, (
"({0}) took too long to get to position | lat: {1:13.9f}, lon: {2:13.9f}, alt: {3:6.2f}, xy off: {4}, z off: {5}, pos_d: {6:.2f}, alt_d: {7:.2f}, VTOL state: {8}, index: {9} | timeout(seconds): {10}".
format(self.mission_name, lat, lon, alt, xy_radius, z_radius,
self.last_pos_d, self.last_alt_d,
self.VTOL_STATES.get(self.extended_state.vtol_state), index,
timeout)))
def wait_for_topics(self, timeout):
"""wait for simulation to be ready, make sure we're getting topic info
from all topics by checking dictionary of flag values set in callbacks,
timeout(int): seconds"""
rospy.loginfo("waiting for simulation topics to be ready")
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
simulation_ready = False
for i in xrange(timeout * loop_freq):
if all(value for value in self.sub_topics_ready.values()):
simulation_ready = True
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue(simulation_ready, (
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
def wait_on_landed_state(self, desired_landed_state, timeout, index):
rospy.loginfo(
"waiting for landed state | state: {0}, index: {1}".format(
self.LAND_STATES.get(desired_landed_state), index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
landed_state_confirmed = False
for i in xrange(timeout * loop_freq):
if self.extended_state.landed_state == desired_landed_state:
landed_state_confirmed = True
rospy.loginfo(
"landed state confirmed | state: {0}, index: {1}".format(
self.LAND_STATES.get(desired_landed_state), index))
break
rate.sleep()
self.assertTrue(landed_state_confirmed, (
"({0}) landed state not detected | desired: {1}, current: {2} | index: {3}, timeout(seconds): {4}".
format(self.mission_name,
self.LAND_STATES.get(desired_landed_state),
self.LAND_STATES.get(self.extended_state.landed_state),
index, timeout)))
def wait_on_transition(self, transition, timeout, index):
"""Wait for VTOL transition, timeout(int): seconds"""
rospy.loginfo(
"waiting for VTOL transition | transition: {0}, index: {1}".format(
self.VTOL_STATES.get(transition), index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
transitioned = False
for i in xrange(timeout * loop_freq):
if transition == self.extended_state.vtol_state:
rospy.loginfo(
"transitioned | index: {0} | seconds: {1} of {2}".format(
index, i / loop_freq, timeout))
transitioned = True
break
rate.sleep()
self.assertTrue(transitioned, (
"({0}) transition not detected | index: {1} | timeout(seconds): {2}, ".
format(self.mission_name, index, timeout)))
"took too long to get to position | lat: {0:.9f}, lon: {1:.9f}, alt: {2:.2f}, xy off: {3}, z off: {4}, pos_d: {5:.2f}, alt_d: {6:.2f}, index: {7} | timeout(seconds): {8}".
format(lat, lon, alt, xy_radius, z_radius, self.last_pos_d,
self.last_alt_d, index, timeout)))
#
# Test method
@ -443,46 +268,22 @@ class MavrosMissionTest(unittest.TestCase): @@ -443,46 +268,22 @@ class MavrosMissionTest(unittest.TestCase):
os.path.realpath(__file__)) + "/" + sys.argv[1]
rospy.loginfo("reading mission {0}".format(mission_file))
wps = []
with open(mission_file, 'r') as f:
mission_ext = os.path.splitext(mission_file)[1]
if mission_ext == '.mission':
rospy.loginfo("new style mission file detected")
for waypoint in read_new_mission(f):
wps.append(waypoint)
rospy.logdebug(waypoint)
elif mission_ext == '.txt':
rospy.loginfo("old style mission file detected")
mission = QGroundControlWP()
for waypoint in mission.read(f):
wps.append(waypoint)
rospy.logdebug(waypoint)
else:
raise IOError('unknown mission file extension', mission_ext)
rospy.loginfo("send mission")
result = False
try:
res = self.wp_push_srv(start_index=0, waypoints=wps)
result = res.success
except rospy.ServiceException as e:
rospy.logerr(e)
self.assertTrue(
result,
"({0}) mission could not be transfered".format(self.mission_name))
# delay starting the mission
self.wait_for_topics(30)
wps = read_mission(mission_file)
except IOError as e:
self.fail(e)
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
self.wait_for_mav_type(10)
rospy.loginfo("seting mission mode")
# push waypoints to FCU and start mission
self.send_wps(wps, 30)
self.set_mode("AUTO.MISSION", 5)
rospy.loginfo("arming")
self.set_arm(True, 5)
rospy.loginfo("run mission")
rospy.loginfo("run mission {0}".format(self.mission_name))
for index, waypoint in enumerate(wps):
# only check position for waypoints where this makes sense
if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT or waypoint.frame == Waypoint.FRAME_GLOBAL:
@ -500,7 +301,7 @@ class MavrosMissionTest(unittest.TestCase): @@ -500,7 +301,7 @@ class MavrosMissionTest(unittest.TestCase):
if waypoint.command == 84: # VTOL takeoff implies transition to FW
transition = ExtendedState.VTOL_STATE_FW
if waypoint.command == 85: # VTOL takeoff implies transition to MC
if waypoint.command == 85: # VTOL land implies transition to MC
transition = ExtendedState.VTOL_STATE_MC
self.wait_on_transition(transition, 60, index)
@ -510,8 +311,8 @@ class MavrosMissionTest(unittest.TestCase): @@ -510,8 +311,8 @@ class MavrosMissionTest(unittest.TestCase):
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND,
60, index)
rospy.loginfo("disarming")
self.set_arm(False, 5)
self.clear_wps(5)
rospy.loginfo("mission done, calculating performance metrics")
last_log = get_last_log()

3
test/mavros_posix_test_mission.test

@ -6,12 +6,13 @@ @@ -6,12 +6,13 @@
<arg name="gui" default="false"/>
<arg name="est" default="ekf2"/>
<arg name="mission"/>
<arg name="vehicle"/>
<include file="$(find px4)/launch/mavros_posix_sitl.launch">
<arg name="ns" value="$(arg ns)"/>
<arg name="headless" value="$(arg headless)"/>
<arg name="gui" value="$(arg gui)"/>
<arg name="vehicle" value="standard_vtol"/>
<arg name="vehicle" value="$(arg vehicle)"/>
<arg name="est" value="$(arg est)"/>
</include>

12
test/mavros_posix_tests_missions.test

@ -15,11 +15,11 @@ @@ -15,11 +15,11 @@
</include>
<group ns="$(arg ns)">
<test test-name="multicopter_box" pkg="px4" type="mission_test.py" vehicle="iris" time-limit="120.0" args="multirotor_box.mission"/>
<test test-name="mission_test_new_1" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_new_1.txt" />
<test test-name="mission_test_new_2" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_new_2.txt" />
<test test-name="mission_test_old_1" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_1.txt" />
<test test-name="mission_test_old_2" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_2.txt" />
<test test-name="mission_test_old_3" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_3.txt" />
<test test-name="multicopter_box" pkg="px4" type="mission_test.py" time-limit="120.0" args="multirotor_box.mission"/>
<test test-name="mission_test_new_1" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_new_1.txt"/>
<test test-name="mission_test_new_2" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_new_2.txt"/>
<test test-name="mission_test_old_1" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_1.txt"/>
<test test-name="mission_test_old_2" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_2.txt"/>
<test test-name="mission_test_old_3" pkg="px4" type="mission_test.py" time-limit="300.0" args="vtol_old_3.txt"/>
</group>
</launch>

Loading…
Cancel
Save