|
|
@ -74,6 +74,7 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
self.fw_alt_rad = 10 |
|
|
|
self.fw_alt_rad = 10 |
|
|
|
self.last_alt_d = 9999 |
|
|
|
self.last_alt_d = 9999 |
|
|
|
self.last_pos_d = 9999 |
|
|
|
self.last_pos_d = 9999 |
|
|
|
|
|
|
|
self.mission_name = "" |
|
|
|
|
|
|
|
|
|
|
|
# need to simulate heartbeat for datalink loss detection |
|
|
|
# need to simulate heartbeat for datalink loss detection |
|
|
|
rospy.Timer(rospy.Duration(0.5), self.send_heartbeat) |
|
|
|
rospy.Timer(rospy.Duration(0.5), self.send_heartbeat) |
|
|
@ -163,9 +164,9 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
count = count + 1 |
|
|
|
count = count + 1 |
|
|
|
self.rate.sleep() |
|
|
|
self.rate.sleep() |
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(count < timeout, ("took too long to get to position " + |
|
|
|
self.assertTrue(count < timeout, (("(%s) took too long to get to position " + |
|
|
|
"lat: %f, lon: %f, alt: %f, xy off: %f, z off: %f, timeout: %d, index: %d, pos_d: %f, alt_d: %f" % |
|
|
|
"lat: %f, lon: %f, alt: %f, xy off: %f, z off: %f, timeout: %d, index: %d, pos_d: %f, alt_d: %f") % |
|
|
|
(lat, lon, alt, xy_radius, z_radius, timeout, index, self.last_pos_d, self.last_alt_d))) |
|
|
|
(self.mission_name, lat, lon, alt, xy_radius, z_radius, timeout, index, self.last_pos_d, self.last_alt_d))) |
|
|
|
|
|
|
|
|
|
|
|
def run_mission(self): |
|
|
|
def run_mission(self): |
|
|
|
"""switch mode: armed | auto""" |
|
|
|
"""switch mode: armed | auto""" |
|
|
@ -193,9 +194,9 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
count = count + 1 |
|
|
|
count = count + 1 |
|
|
|
self.rate.sleep() |
|
|
|
self.rate.sleep() |
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(count < timeout, ("landing not detected after landing WP " + |
|
|
|
self.assertTrue(count < timeout, ("(%s) landing not detected after landing WP " + |
|
|
|
"timeout: %d, index: %d") % |
|
|
|
"timeout: %d, index: %d") % |
|
|
|
(timeout, index)) |
|
|
|
(self.mission_name, timeout, index)) |
|
|
|
|
|
|
|
|
|
|
|
def wait_on_transition(self, transition, timeout, index): |
|
|
|
def wait_on_transition(self, transition, timeout, index): |
|
|
|
"""Wait for VTOL transition""" |
|
|
|
"""Wait for VTOL transition""" |
|
|
@ -219,9 +220,9 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
count = count + 1 |
|
|
|
count = count + 1 |
|
|
|
self.rate.sleep() |
|
|
|
self.rate.sleep() |
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(count < timeout, ("transition not detected " + |
|
|
|
self.assertTrue(count < timeout, ("(%s) transition not detected " + |
|
|
|
"timeout: %d, index: %d") % |
|
|
|
"timeout: %d, index: %d") % |
|
|
|
(timeout, index)) |
|
|
|
(self.mission_name, timeout, index)) |
|
|
|
|
|
|
|
|
|
|
|
def send_heartbeat(self, event=None): |
|
|
|
def send_heartbeat(self, event=None): |
|
|
|
# mav type gcs |
|
|
|
# mav type gcs |
|
|
@ -238,12 +239,13 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
self.fail("usage: mission_test.py mission_file") |
|
|
|
self.fail("usage: mission_test.py mission_file") |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
file = os.path.dirname(os.path.realpath(__file__)) + "/" + sys.argv[1] |
|
|
|
self.mission_name = sys.argv[1] |
|
|
|
|
|
|
|
mission_file = os.path.dirname(os.path.realpath(__file__)) + "/" + sys.argv[1] |
|
|
|
|
|
|
|
|
|
|
|
rospy.loginfo("reading mission %s", file) |
|
|
|
rospy.loginfo("reading mission %s", mission_file) |
|
|
|
mission = QGroundControlWP() |
|
|
|
mission = QGroundControlWP() |
|
|
|
wps = [] |
|
|
|
wps = [] |
|
|
|
for waypoint in mission.read(open(file, 'r')): |
|
|
|
for waypoint in mission.read(open(mission_file, 'r')): |
|
|
|
wps.append(waypoint) |
|
|
|
wps.append(waypoint) |
|
|
|
rospy.logdebug(waypoint) |
|
|
|
rospy.logdebug(waypoint) |
|
|
|
|
|
|
|
|
|
|
@ -253,7 +255,7 @@ class MavrosMissionTest(unittest.TestCase): |
|
|
|
rospy.loginfo("send mission") |
|
|
|
rospy.loginfo("send mission") |
|
|
|
res = self._srv_wp_push(wps) |
|
|
|
res = self._srv_wp_push(wps) |
|
|
|
rospy.loginfo(res) |
|
|
|
rospy.loginfo(res) |
|
|
|
self.assertTrue(res.success, "mission could not be transfered") |
|
|
|
self.assertTrue(res.success, "(%s) mission could not be transfered" % self.mission_name) |
|
|
|
|
|
|
|
|
|
|
|
rospy.loginfo("run mission") |
|
|
|
rospy.loginfo("run mission") |
|
|
|
self.run_mission() |
|
|
|
self.run_mission() |
|
|
|