#!/usr/bin/env python2
#***************************************************************************
#
# Copyright (c) 2015-2016 PX4 Development Team. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# 3. Neither the name PX4 nor the names of its contributors may be
# used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
#***************************************************************************/
#
# @author Andreas Antener <andreas@uaventure.com>
#
# The shebang of this file is currently Python2 because some
# dependencies such as pymavlink don't play well with Python3 yet.
from __future__ import division
PKG = ' px4 '
import rospy
import glob
import json
import math
import os
import px4tools
import sys
from mavros import mavlink
from mavros . mission import QGroundControlWP
from mavros_msgs . msg import Mavlink , Waypoint
from mavros_test_common import MavrosTestCommon
from pymavlink import mavutil
from threading import Thread
def get_last_log ( ) :
try :
log_path = os . environ [ ' PX4_LOG_DIR ' ]
except KeyError :
log_path = os . path . join ( os . environ [ ' HOME ' ] ,
' .ros/rootfs/fs/microsd/log ' )
last_log_dir = sorted ( glob . glob ( os . path . join ( log_path , ' * ' ) ) ) [ - 1 ]
last_log = sorted ( glob . glob ( os . path . join ( last_log_dir , ' *.ulg ' ) ) ) [ - 1 ]
return last_log
def read_mission ( mission_filename ) :
wps = [ ]
with open ( mission_filename , ' r ' ) as f :
mission_filename_ext = os . path . splitext ( mission_filename ) [ 1 ]
if mission_filename_ext == ' .plan ' :
for waypoint in read_plan_file ( f ) :
wps . append ( waypoint )
rospy . logdebug ( waypoint )
elif mission_filename_ext == ' .mission ' :
for waypoint in read_mission_file ( f ) :
wps . append ( waypoint )
rospy . logdebug ( waypoint )
elif mission_filename_ext == ' .txt ' :
mission = QGroundControlWP ( )
for waypoint in mission . read ( f ) :
wps . append ( waypoint )
rospy . logdebug ( waypoint )
else :
raise IOError ( " unknown mission file extension " ,
mission_filename_ext )
# set first item to current
if wps [ 0 ] :
wps [ 0 ] . is_current = True
return wps
def read_plan_file ( f ) :
d = json . load ( f )
if ' mission ' in d :
d = d [ ' mission ' ]
if ' items ' in d :
for wp in d [ ' items ' ] :
yield Waypoint (
is_current = False ,
frame = int ( wp [ ' frame ' ] ) ,
command = int ( wp [ ' command ' ] ) ,
param1 = float ( ' nan '
if wp [ ' params ' ] [ 0 ] is None else wp [ ' params ' ] [ 0 ] ) ,
param2 = float ( ' nan '
if wp [ ' params ' ] [ 1 ] is None else wp [ ' params ' ] [ 1 ] ) ,
param3 = float ( ' nan '
if wp [ ' params ' ] [ 2 ] is None else wp [ ' params ' ] [ 2 ] ) ,
param4 = float ( ' nan '
if wp [ ' params ' ] [ 3 ] is None else wp [ ' params ' ] [ 3 ] ) ,
x_lat = float ( wp [ ' params ' ] [ 4 ] ) ,
y_long = float ( wp [ ' params ' ] [ 5 ] ) ,
z_alt = float ( wp [ ' params ' ] [ 6 ] ) ,
autocontinue = bool ( wp [ ' autoContinue ' ] ) )
else :
raise IOError ( " no mission items " )
def read_mission_file ( f ) :
d = json . load ( f )
if ' items ' in d :
for wp in d [ ' items ' ] :
yield Waypoint (
is_current = False ,
frame = int ( wp [ ' frame ' ] ) ,
command = int ( wp [ ' command ' ] ) ,
param1 = float ( wp [ ' param1 ' ] ) ,
param2 = float ( wp [ ' param2 ' ] ) ,
param3 = float ( wp [ ' param3 ' ] ) ,
param4 = float ( wp [ ' param4 ' ] ) ,
x_lat = float ( wp [ ' coordinate ' ] [ 0 ] ) ,
y_long = float ( wp [ ' coordinate ' ] [ 1 ] ) ,
z_alt = float ( wp [ ' coordinate ' ] [ 2 ] ) ,
autocontinue = bool ( wp [ ' autoContinue ' ] ) )
else :
raise IOError ( " no mission items " )
class MavrosMissionTest ( MavrosTestCommon ) :
"""
Run a mission
"""
def setUp ( self ) :
super ( self . __class__ , self ) . setUp ( )
self . mc_rad = 5 # meters
self . fw_rad = 60
self . fw_alt_rad = 10
self . mission_name = " "
self . mavlink_pub = rospy . Publisher ( ' mavlink/to ' , Mavlink , queue_size = 1 )
# need to simulate heartbeat to prevent datalink loss detection
self . hb_mav_msg = mavutil . mavlink . MAVLink_heartbeat_message (
mavutil . mavlink . MAV_TYPE_GCS , 0 , 0 , 0 , 0 , 0 )
self . hb_mav_msg . pack ( mavutil . mavlink . MAVLink ( ' ' , 2 , 1 ) )
self . hb_ros_msg = mavlink . convert_to_rosmsg ( self . hb_mav_msg )
self . hb_thread = Thread ( target = self . send_heartbeat , args = ( ) )
self . hb_thread . daemon = True
self . hb_thread . start ( )
def tearDown ( self ) :
super ( MavrosMissionTest , self ) . tearDown ( )
#
# Helper methods
#
def send_heartbeat ( self ) :
rate = rospy . Rate ( 2 ) # Hz
while not rospy . is_shutdown ( ) :
self . mavlink_pub . publish ( self . hb_ros_msg )
try : # prevent garbage in console output when thread is killed
rate . sleep ( )
except rospy . ROSInterruptException :
pass
def is_at_position ( self , lat , lon , alt , xy_offset , z_offset ) :
""" alt(amsl), xy_offset, z_offset: meters """
R = 6371000 # metres
rlat1 = math . radians ( lat )
rlat2 = math . radians ( self . global_position . latitude )
rlat_d = math . radians ( self . global_position . latitude - lat )
rlon_d = math . radians ( self . global_position . longitude - lon )
a = ( math . sin ( rlat_d / 2 ) * math . sin ( rlat_d / 2 ) + math . cos ( rlat1 ) *
math . cos ( rlat2 ) * math . sin ( rlon_d / 2 ) * math . sin ( rlon_d / 2 ) )
c = 2 * math . atan2 ( math . sqrt ( a ) , math . sqrt ( 1 - a ) )
d = R * c
alt_d = abs ( alt - self . altitude . amsl )
rospy . logdebug ( " d: {0} , alt_d: {1} " . format ( d , alt_d ) )
return ( d < xy_offset and alt_d < z_offset ) , d , alt_d
def reach_position ( self , lat , lon , alt , timeout , index ) :
""" alt(amsl): meters, timeout(int): seconds """
rospy . loginfo (
" trying to reach waypoint | lat: {0:.9f} , lon: {1:.9f} , alt: {2:.2f} , index: {3} " .
format ( lat , lon , alt , index ) )
best_pos_xy_d = None
best_pos_z_d = None
fcu_advanced = False
# does it reach the position in 'timeout' seconds?
loop_freq = 2 # Hz
rate = rospy . Rate ( loop_freq )
for i in xrange ( timeout * loop_freq ) :
# use FW radius if VTOL in FW or transition or FW
if ( self . mav_type == mavutil . mavlink . MAV_TYPE_FIXED_WING or
self . extended_state . vtol_state ==
mavutil . mavlink . MAV_VTOL_STATE_FW or
self . extended_state . vtol_state ==
mavutil . mavlink . MAV_VTOL_STATE_TRANSITION_TO_MC or
self . extended_state . vtol_state ==
mavutil . mavlink . MAV_VTOL_STATE_TRANSITION_TO_FW ) :
xy_radius = self . fw_rad
z_radius = self . fw_alt_rad
else : # assume MC
xy_radius = self . mc_rad
z_radius = self . mc_rad
reached , pos_xy_d , pos_z_d = self . is_at_position (
lat , lon , alt , xy_radius , z_radius )
# remember best distances
if not best_pos_xy_d or best_pos_xy_d > pos_xy_d :
best_pos_xy_d = pos_xy_d
if not best_pos_z_d or best_pos_z_d > pos_z_d :
best_pos_z_d = pos_z_d
if reached :
rospy . loginfo (
" position reached | pos_xy_d: {0:.2f} , pos_z_d: {1:.2f} , index: {2} | seconds: {3} of {4} " .
format ( pos_xy_d , pos_z_d , index , i / loop_freq , timeout ) )
break
elif not fcu_advanced and ( index < self . mission_wp . current_seq or (
index == len ( self . mission_wp . waypoints ) and
self . mission_wp . current_seq == 0 ) ) :
# FCU advanced to the next mission item, or finished mission
fcu_advanced = True
rospy . loginfo (
" FCU advanced to mission item index {0} when checking position | xy off: {1} , z off: {2} , pos_xy_d: {3:.2f} , pos_z_d: {4:.2f} , " .
format ( self . mission_wp . current_seq , xy_radius , z_radius ,
pos_xy_d , pos_z_d ) )
rate . sleep ( )
self . assertTrue (
reached ,
" took too long to get to position | lat: {0:.9f} , lon: {1:.9f} , alt: {2:.2f} , xy off: {3} , z off: {4} , current pos_xy_d: {5:.2f} , current pos_z_d: {6:.2f} , best pos_xy_d: {7:.2f} , best pos_z_d: {8:.2f} , index: {9} | timeout(seconds): {10} " .
format ( lat , lon , alt , xy_radius , z_radius , pos_xy_d , pos_z_d ,
best_pos_xy_d , best_pos_z_d , index , timeout ) )
#
# Test method
#
def test_mission ( self ) :
""" Test mission """
if len ( sys . argv ) < 2 :
self . fail ( " usage: mission_test.py mission_file " )
return
self . mission_name = sys . argv [ 1 ]
mission_file = os . path . dirname (
os . path . realpath ( __file__ ) ) + " / " + sys . argv [ 1 ]
rospy . loginfo ( " reading mission {0} " . format ( mission_file ) )
try :
wps = read_mission ( mission_file )
except IOError as e :
self . fail ( e )
# make sure the simulation is ready to start the mission
self . wait_for_topics ( 60 )
self . wait_for_landed_state ( mavutil . mavlink . MAV_LANDED_STATE_ON_GROUND ,
10 , - 1 )
self . wait_for_mav_type ( 10 )
# push waypoints to FCU and start mission
self . send_wps ( wps , 30 )
self . log_topic_vars ( )
self . set_mode ( " AUTO.MISSION " , 5 )
self . set_arm ( True , 5 )
rospy . loginfo ( " run mission {0} " . format ( self . mission_name ) )
for index , waypoint in enumerate ( wps ) :
# only check position for waypoints where this makes sense
if ( waypoint . frame == Waypoint . FRAME_GLOBAL_REL_ALT or
waypoint . frame == Waypoint . FRAME_GLOBAL ) :
alt = waypoint . z_alt
if waypoint . frame == Waypoint . FRAME_GLOBAL_REL_ALT :
alt + = self . altitude . amsl - self . altitude . relative
self . reach_position ( waypoint . x_lat , waypoint . y_long , alt , 60 ,
index )
# check if VTOL transition happens if applicable
if ( waypoint . command == mavutil . mavlink . MAV_CMD_NAV_VTOL_TAKEOFF or
waypoint . command == mavutil . mavlink . MAV_CMD_NAV_VTOL_LAND
or waypoint . command ==
mavutil . mavlink . MAV_CMD_DO_VTOL_TRANSITION ) :
transition = waypoint . param1 # used by MAV_CMD_DO_VTOL_TRANSITION
if waypoint . command == mavutil . mavlink . MAV_CMD_NAV_VTOL_TAKEOFF : # VTOL takeoff implies transition to FW
transition = mavutil . mavlink . MAV_VTOL_STATE_FW
if waypoint . command == mavutil . mavlink . MAV_CMD_NAV_VTOL_LAND : # VTOL land implies transition to MC
transition = mavutil . mavlink . MAV_VTOL_STATE_MC
self . wait_for_vtol_state ( transition , 60 , index )
# after reaching position, wait for landing detection if applicable
if ( waypoint . command == mavutil . mavlink . MAV_CMD_NAV_VTOL_LAND or
waypoint . command == mavutil . mavlink . MAV_CMD_NAV_LAND ) :
self . wait_for_landed_state (
mavutil . mavlink . MAV_LANDED_STATE_ON_GROUND , 60 , index )
self . set_arm ( False , 5 )
self . clear_wps ( 5 )
rospy . loginfo ( " mission done, calculating performance metrics " )
last_log = get_last_log ( )
rospy . loginfo ( " log file {0} " . format ( last_log ) )
data = px4tools . read_ulog ( last_log ) . concat ( dt = 0.1 )
data = px4tools . compute_data ( data )
res = px4tools . estimator_analysis ( data , False )
# enforce performance
self . assertTrue ( abs ( res [ ' roll_error_mean ' ] ) < 5.0 , str ( res ) )
self . assertTrue ( abs ( res [ ' pitch_error_mean ' ] ) < 5.0 , str ( res ) )
self . assertTrue ( abs ( res [ ' yaw_error_mean ' ] ) < 5.0 , str ( res ) )
self . assertTrue ( res [ ' roll_error_std ' ] < 5.0 , str ( res ) )
self . assertTrue ( res [ ' pitch_error_std ' ] < 5.0 , str ( res ) )
self . assertTrue ( res [ ' yaw_error_std ' ] < 5.0 , str ( res ) )
if __name__ == ' __main__ ' :
import rostest
rospy . init_node ( ' test_node ' , anonymous = True )
name = " mavros_mission_test "
if len ( sys . argv ) > 1 :
name + = " - %s " % sys . argv [ 1 ]
rostest . rosrun ( PKG , name , MavrosMissionTest )