#!/usr/bin/env python3
################################################################################
#
# Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
################################################################################
import sys
import os
import argparse
import errno
import yaml
import re
import difflib
class Classifier ( ) :
"""
Class to classify RTPS msgs as senders , receivers or to be ignored
"""
def __init__ ( self , yaml_file , msg_folder ) :
self . msg_folder = msg_folder
self . all_msgs_list = self . set_all_msgs ( )
self . msg_id_map = self . parse_yaml_msg_id_file ( yaml_file )
self . alias_space_init_id = 150
# Checkers
self . check_if_listed ( yaml_file )
self . check_base_type ( )
self . check_id_space ( )
self . msgs_to_send , self . alias_msgs_to_send = self . set_msgs_to_send ( )
self . msgs_to_receive , self . alias_msgs_to_receive = self . set_msgs_to_receive ( )
self . msgs_to_ignore , self . alias_msgs_to_ignore = self . set_msgs_to_ignore ( )
self . msg_files_send = self . set_msg_files_send ( )
self . msg_files_receive = self . set_msg_files_receive ( )
self . msg_files_ignore = self . set_msg_files_ignore ( )
# setters (for class init)
def set_all_msgs ( self ) :
msg_list = [ ]
for filename in os . listdir ( self . msg_folder ) :
if ' .msg ' in filename :
msg_list . append ( re . sub ( " .msg " , " " , filename ) )
return msg_list
def set_msgs_to_send ( self ) :
send = { }
send_alias = [ ]
for dict in self . msg_id_map [ ' rtps ' ] :
if ' send ' in list ( dict . keys ( ) ) :
if ' alias ' in list ( dict . keys ( ) ) :
send_alias . append (
( { dict [ ' msg ' ] : dict [ ' id ' ] } , dict [ ' alias ' ] ) )
else :
send . update ( { dict [ ' msg ' ] : dict [ ' id ' ] } )
return send , send_alias
def set_msgs_to_receive ( self ) :
receive = { }
receive_alias = [ ]
for dict in self . msg_id_map [ ' rtps ' ] :
if ' receive ' in list ( dict . keys ( ) ) :
if ' alias ' in list ( dict . keys ( ) ) :
receive_alias . append (
( { dict [ ' msg ' ] : dict [ ' id ' ] } , dict [ ' alias ' ] ) )
else :
receive . update ( { dict [ ' msg ' ] : dict [ ' id ' ] } )
return receive , receive_alias
def set_msgs_to_ignore ( self ) :
ignore = { }
ignore_alias = [ ]
for dict in self . msg_id_map [ ' rtps ' ] :
if ( ( ' send ' not in list ( dict . keys ( ) ) ) and ( ' receive ' not in list ( dict . keys ( ) ) ) ) :
if ' alias ' in list ( dict . keys ( ) ) :
ignore_alias . append (
( { dict [ ' msg ' ] : dict [ ' id ' ] } , dict [ ' alias ' ] ) )
else :
ignore . update ( { dict [ ' msg ' ] : dict [ ' id ' ] } )
return ignore , ignore_alias
def set_msg_files_send ( self ) :
return [ os . path . join ( self . msg_folder , msg + " .msg " )
for msg in list ( self . msgs_to_send . keys ( ) ) ]
def set_msg_files_receive ( self ) :
return [ os . path . join ( self . msg_folder , msg + " .msg " )
for msg in list ( self . msgs_to_receive . keys ( ) ) ]
def set_msg_files_ignore ( self ) :
return [ os . path . join ( self . msg_folder , msg + " .msg " )
for msg in list ( self . msgs_to_ignore . keys ( ) ) ]
def check_if_listed ( self , yaml_file ) :
"""
Checks if all msgs are listed under the RTPS ID yaml file
"""
none_listed_msgs = [ ]
for msg in self . all_msgs_list :
result = not any (
dict [ ' msg ' ] == msg for dict in self . msg_id_map [ ' rtps ' ] )
if result :
none_listed_msgs . append ( msg )
if len ( none_listed_msgs ) > 0 :
if len ( none_listed_msgs ) == 1 :
error_msg = " The following message is not listen under "
elif len ( none_listed_msgs ) > 1 :
error_msg = " The following messages are not listen under "
raise AssertionError (
" \n %s %s : " % ( error_msg , yaml_file ) + " , " . join ( ' %s ' % msg for msg in none_listed_msgs ) +
" \n \n Please add them to the yaml file with the respective ID and, if applicable, mark them " +
" to be sent or received by the micro-RTPS bridge. \n "
" NOTE: If the message has multi-topics (#TOPICS), these should be added as well. \n " )
def check_base_type ( self ) :
"""
Check if alias message has correct base type
"""
registered_alias_msgs = list (
dict [ ' alias ' ] for dict in self . msg_id_map [ ' rtps ' ] if ' alias ' in list ( dict . keys ( ) ) )
base_types = [ ]
for dict in self . msg_id_map [ ' rtps ' ] :
if ' alias ' not in list ( dict . keys ( ) ) :
base_types . append ( dict [ ' msg ' ] )
incorrect_base_types = list (
set ( registered_alias_msgs ) - set ( base_types ) )
base_types_suggestion = { }
for incorrect in incorrect_base_types :
base_types_suggestion . update ( { incorrect : difflib . get_close_matches (
incorrect , base_types , n = 1 , cutoff = 0.6 ) } )
if len ( base_types_suggestion ) > 0 :
raise AssertionError (
( ' \n ' + ' \n ' . join ( ' \t - The multi-topic message base type \' {} \' does not exist. {} ' . format ( k , ( ' Did you mean \' ' + v [ 0 ] + ' \' ? ' if v else ' ' ) ) for k , v in list ( base_types_suggestion . items ( ) ) ) ) )
def check_id_space ( self ) :
"""
Check if msg ID is in the correct ID space
"""
incorrect_base_ids = { }
incorrect_alias_ids = { }
for dict in self . msg_id_map [ ' rtps ' ] :
if ' alias ' not in list ( dict . keys ( ) ) and dict [ ' id ' ] > = self . alias_space_init_id :
incorrect_base_ids . update ( { dict [ ' msg ' ] : dict [ ' id ' ] } )
elif ' alias ' in list ( dict . keys ( ) ) and dict [ ' id ' ] < self . alias_space_init_id :
incorrect_alias_ids . update ( { dict [ ' msg ' ] : dict [ ' id ' ] } )
if len ( incorrect_base_ids ) > 0 :
raise AssertionError (
( ' \n ' + ' \n ' . join ( ' \t - The message \' {} with ID \' {} \' is in the wrong ID space. Please use any of the available IDs from 0 to 149 ' . format ( k , v ) for k , v in list ( incorrect_base_ids . items ( ) ) ) ) )
if len ( incorrect_alias_ids ) > 0 :
raise AssertionError (
( ' \n ' + ' \n ' . join ( ' \t - The alias message \' {} \' with ID \' {} \' is in the wrong ID space. Please use any of the available IDs from 149 to 255 ' . format ( k , v ) for k , v in list ( incorrect_alias_ids . items ( ) ) ) ) )
@staticmethod
def parse_yaml_msg_id_file ( yaml_file ) :
"""
Parses a yaml file into a dict
"""
try :
with open ( yaml_file , ' r ' ) as f :
return yaml . safe_load ( f )
except OSError as e :
if e . errno == errno . ENOENT :
raise IOError ( errno . ENOENT , os . strerror (
errno . ENOENT ) , yaml_file )
else :
raise
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( )
parser . add_argument ( " -s " , " --send " , dest = ' send ' ,
action = " store_true " , help = " Get topics to be sent " )
parser . add_argument ( " -a " , " --alias " , dest = ' alias ' ,
action = " store_true " , help = " Get alias topics " )
parser . add_argument ( " -r " , " --receive " , dest = ' receive ' ,
action = " store_true " , help = " Get topics to be received " )
parser . add_argument ( " -i " , " --ignore " , dest = ' ignore ' ,
action = " store_true " , help = " Get topics to be ignored " )
parser . add_argument ( " -p " , " --path " , dest = ' path ' ,
action = " store_true " , help = " Get msgs and its paths " )
parser . add_argument ( " -m " , " --topic-msg-dir " , dest = ' msgdir ' , type = str ,
help = " Topics message dir, by default msg/ " , default = " msg " )
parser . add_argument ( " -y " , " --rtps-ids-file " , dest = ' yaml_file ' , type = str ,
help = " RTPS msg IDs definition file absolute path, by default use relative path to msg, tools/uorb_rtps_message_ids.yaml " ,
default = ' tools/uorb_rtps_message_ids.yaml ' )
# Parse arguments
args = parser . parse_args ( )
msg_dir = args . msgdir
if args . msgdir == ' msg ' :
msg_dir = os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) )
else :
msg_dir = os . path . abspath ( args . msgdir )
classifier = ( Classifier ( os . path . abspath ( args . yaml_file ) , msg_dir ) if os . path . isabs ( args . yaml_file )
else Classifier ( os . path . join ( msg_dir , args . yaml_file ) , msg_dir ) )
if args . send :
if args . path :
print ( ( ' send files: ' + ' , ' . join ( str ( msg_file )
for msg_file in classifier . msgs_files_send ) + ' \n ' ) )
else :
if args . alias :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_send . keys ( ) ) ) + ( ' alias ' + ' , ' . join ( str ( list ( msg [ 0 ] . keys ( ) ) [ 0 ] )
for msg in classifier . alias_msgs_to_send ) if len ( classifier . alias_msgs_to_send ) > 0 else ' ' ) + ' \n ' ) )
else :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_send . keys ( ) ) ) ) )
if args . receive :
if args . path :
print ( ( ' receive files: ' + ' , ' . join ( str ( msg_file )
for msg_file in classifier . msgs_files_receive ) + ' \n ' ) )
else :
if args . alias :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_receive . keys ( ) ) ) + ( ' alias ' + ' , ' . join ( str ( list ( msg [ 0 ] . keys ( ) ) [ 0 ] )
for msg in classifier . alias_msgs_to_receive ) if len ( classifier . alias_msgs_to_receive ) > 0 else ' ' ) + ' \n ' ) )
else :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_receive . keys ( ) ) ) ) )
if args . ignore :
if args . path :
print ( ( ' ignore files: ' + ' , ' . join ( str ( msg_file )
for msg_file in classifier . msgs_files_ignore ) + ' \n ' ) )
else :
if args . alias :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_ignore . keys ( ) ) ) + ( ' alias ' + ' , ' . join ( str ( list ( msg [ 0 ] . keys ( ) ) [ 0 ] )
for msg in classifier . alias_msgs_to_ignore ) if len ( classifier . alias_msgs_to_ignore ) > 0 else ' ' ) + ' \n ' ) )
else :
print ( ( ' , ' . join ( str ( msg )
for msg in list ( classifier . msgs_to_ignore . keys ( ) ) ) ) )