|
|
|
@ -42,6 +42,7 @@ import rospy
@@ -42,6 +42,7 @@ import rospy
|
|
|
|
|
import math |
|
|
|
|
import rosbag |
|
|
|
|
import sys |
|
|
|
|
import os |
|
|
|
|
|
|
|
|
|
import mavros |
|
|
|
|
from pymavlink import mavutil |
|
|
|
@ -49,7 +50,7 @@ from mavros import mavlink
@@ -49,7 +50,7 @@ from mavros import mavlink
|
|
|
|
|
|
|
|
|
|
from geometry_msgs.msg import PoseStamped |
|
|
|
|
from mavros_msgs.srv import CommandLong, WaypointPush |
|
|
|
|
from mavros_msgs.msg import Mavlink, Waypoint |
|
|
|
|
from mavros_msgs.msg import Mavlink, Waypoint, ExtendedState |
|
|
|
|
from sensor_msgs.msg import NavSatFix |
|
|
|
|
from mavros.mission import QGroundControlWP |
|
|
|
|
#from px4_test_helper import PX4TestHelper |
|
|
|
@ -66,6 +67,7 @@ class MavrosMissionTest(unittest.TestCase):
@@ -66,6 +67,7 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
rospy.Subscriber("mavros/local_position/pose", PoseStamped, self.position_callback) |
|
|
|
|
rospy.Subscriber("mavros/global_position/global", NavSatFix, self.global_position_callback) |
|
|
|
|
rospy.Subscriber("mavros/extended_state", ExtendedState, self.extended_state_callback) |
|
|
|
|
self.pub_mavlink = rospy.Publisher('mavlink/to', Mavlink, queue_size=1) |
|
|
|
|
rospy.wait_for_service('mavros/cmd/command', 30) |
|
|
|
|
self._srv_cmd_long = rospy.ServiceProxy('mavros/cmd/command', CommandLong, persistent=True) |
|
|
|
@ -74,7 +76,10 @@ class MavrosMissionTest(unittest.TestCase):
@@ -74,7 +76,10 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
self.has_global_pos = False |
|
|
|
|
self.local_position = PoseStamped() |
|
|
|
|
self.global_position = NavSatFix() |
|
|
|
|
self.extended_state = ExtendedState() |
|
|
|
|
self.home_alt = 0 |
|
|
|
|
self.mc_rad = 5 |
|
|
|
|
self.fw_rad = 40 |
|
|
|
|
|
|
|
|
|
# need to simulate heartbeat for datalink loss detection |
|
|
|
|
rospy.Timer(rospy.Duration(0.5), self.send_heartbeat) |
|
|
|
@ -96,6 +101,9 @@ class MavrosMissionTest(unittest.TestCase):
@@ -96,6 +101,9 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
self.home_alt = data.altitude |
|
|
|
|
self.has_global_pos = True |
|
|
|
|
|
|
|
|
|
def extended_state_callback(self, data): |
|
|
|
|
self.extended_state = data |
|
|
|
|
|
|
|
|
|
# |
|
|
|
|
# Helper methods |
|
|
|
|
# |
|
|
|
@ -115,13 +123,18 @@ class MavrosMissionTest(unittest.TestCase):
@@ -115,13 +123,18 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
d = R * c |
|
|
|
|
alt_d = abs(alt - self.global_position.altitude) |
|
|
|
|
|
|
|
|
|
rospy.loginfo("d: %f, alt_d: %f", d, alt_d) |
|
|
|
|
#rospy.loginfo("d: %f, alt_d: %f", d, alt_d) |
|
|
|
|
return d < offset and alt_d < offset |
|
|
|
|
|
|
|
|
|
def reach_position(self, lat, lon, alt, radius, timeout): |
|
|
|
|
def reach_position(self, lat, lon, alt, timeout): |
|
|
|
|
# does it reach the position in X seconds? |
|
|
|
|
count = 0 |
|
|
|
|
while count < timeout: |
|
|
|
|
# use different radius matching vehicle state |
|
|
|
|
radius = self.mc_rad |
|
|
|
|
if self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW: |
|
|
|
|
radius = self.fw_rad |
|
|
|
|
|
|
|
|
|
if self.is_at_position(lat, lon, alt, radius): |
|
|
|
|
break |
|
|
|
|
count = count + 1 |
|
|
|
@ -153,13 +166,15 @@ class MavrosMissionTest(unittest.TestCase):
@@ -153,13 +166,15 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
"""Test mission""" |
|
|
|
|
|
|
|
|
|
if len(sys.argv) < 2: |
|
|
|
|
self.fail("no mission argument") |
|
|
|
|
self.fail("usage: mission_test.py mission_file") |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
rospy.loginfo("reading mission") |
|
|
|
|
file = os.path.dirname(os.path.realpath(__file__)) + "/" + sys.argv[1] |
|
|
|
|
|
|
|
|
|
rospy.loginfo("reading mission %s", file) |
|
|
|
|
mission = QGroundControlWP() |
|
|
|
|
wps = [] |
|
|
|
|
for wp in mission.read(open(sys.argv[1], 'r')): |
|
|
|
|
for wp in mission.read(open(file, 'r')): |
|
|
|
|
wps.append(wp) |
|
|
|
|
rospy.logdebug(wp) |
|
|
|
|
|
|
|
|
@ -187,7 +202,7 @@ class MavrosMissionTest(unittest.TestCase):
@@ -187,7 +202,7 @@ class MavrosMissionTest(unittest.TestCase):
|
|
|
|
|
alt = wp.z_alt |
|
|
|
|
if wp.frame == Waypoint.FRAME_GLOBAL_REL_ALT: |
|
|
|
|
alt += self.home_alt |
|
|
|
|
self.reach_position(wp.x_lat, wp.y_long, alt, 10, 300) |
|
|
|
|
self.reach_position(wp.x_lat, wp.y_long, alt, 600) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|