@ -3,12 +3,14 @@
from __future__ import print_function
from __future__ import print_function
import argparse
import argparse
import os
import codecs
import codecs
import re
import colorsys
import colorsys
import json
import json
import logging
import os
import re
import sys
import sys
from typing import Optional , Set , Tuple
parser = argparse . ArgumentParser (
parser = argparse . ArgumentParser (
@ -20,13 +22,18 @@ parser.add_argument('-s', '--src-path', action='append',
parser . add_argument ( ' -e ' , ' --exclude-path ' , action = ' append ' ,
parser . add_argument ( ' -e ' , ' --exclude-path ' , action = ' append ' ,
help = ' Excluded path(s), can be specified multiple times ' ,
help = ' Excluded path(s), can be specified multiple times ' ,
default = [ ] )
default = [ ] )
parser . add_argument ( ' --merge-depends ' , action = ' store_true ' ,
help = ' Merge library topics inte the modules that depend on them. ' )
parser . add_argument ( ' -v ' , ' --verbosity ' , action = ' count ' ,
help = ' increase output verbosity; primarily for debugging; repeat for more detail ' ,
default = 0 )
parser . add_argument ( ' -f ' , ' --file ' , metavar = ' file ' , action = ' store ' ,
parser . add_argument ( ' -f ' , ' --file ' , metavar = ' file ' , action = ' store ' ,
help = ' output file name prefix ' ,
help = ' output file name prefix ' ,
default = ' graph ' )
default = ' graph ' )
parser . add_argument ( ' -o ' , ' --output ' , metavar = ' output ' , action = ' store ' ,
parser . add_argument ( ' -o ' , ' --output ' , metavar = ' output ' , action = ' store ' ,
help = ' output format (json or graphviz) ' ,
help = ' output format (json or graphviz) ' ,
default = ' json ' )
default = ' json ' )
parser . add_argument ( ' --use-topic-union ' , action = ' store_true ' ,
parser . add_argument ( ' -u ' , ' - -use-topic-union ' , action = ' store_true ' ,
help = '''
help = '''
Use the union of all publication and subscription topics ( useful for complete
Use the union of all publication and subscription topics ( useful for complete
graphs or only few / single module ( s ) ) . The default is to use the intersection
graphs or only few / single module ( s ) ) . The default is to use the intersection
@ -36,14 +43,9 @@ parser.add_argument('-m', '--modules', action='store',
' MAIN, e.g. from a startup script) ' ,
' MAIN, e.g. from a startup script) ' ,
default = ' ' )
default = ' ' )
args = parser . parse_args ( )
g_debug = False
def dbg_print ( string ) :
if g_debug :
print ( string )
logging . basicConfig ( level = logging . WARNING )
log = logging . getLogger ( )
def get_N_colors ( N , s = 0.8 , v = 0.9 ) :
def get_N_colors ( N , s = 0.8 , v = 0.9 ) :
""" get N distinct colors as a list of hex strings """
""" get N distinct colors as a list of hex strings """
@ -59,7 +61,7 @@ class PubSub(object):
""" Collects either publication or subscription information for nodes
""" Collects either publication or subscription information for nodes
( modules and topics ) & edges """
( modules and topics ) & edges """
def __init__ ( self , is_publicatio n, topic_blacklist , orb_pub_sub_ regexes , special_cas es) :
def __init__ ( self , name , topic_blacklist , regexes ) :
"""
"""
: param is_publication : if True , publications , False for
: param is_publication : if True , publications , False for
subscriptions
subscriptions
@ -68,189 +70,216 @@ class PubSub(object):
( e . g . orb_subscribe ) . They need to have 2 captures , the second
( e . g . orb_subscribe ) . They need to have 2 captures , the second
one is the one capturing ORB_ID ( < topic >
one is the one capturing ORB_ID ( < topic >
"""
"""
self . _module_pubsubs = { } # key = module name, value = set of topic names
self . _name = name
self . _special_cases = special_cases
self . _special_cases_matched = None
self . _topic_blacklist = topic_blacklist
self . _topic_blacklist = topic_blacklist
self . _orb_pub_sub_regexes = orb_pub_sub_regexes
self . _regexes = set ( [ re . compile ( regex ) for regex in regexes ] )
if is_publication :
self . _method = ' Publication '
else :
self . _method = ' Subscription '
def reset ( self ) :
self . _special_cases_matched = [ False ] * len ( self . _special_cases )
def filter_modules ( self , module_whitelist ) :
remove = [ k for k in self . _module_pubsubs if k not in module_whitelist ]
for k in remove : del self . _module_pubsubs [ k ]
def check_if_match_found ( self , modules ) :
""" check if all special cases got a match (if not, it means the source
code got changed )
"""
for i , ( module_match , file_match_re , src_match_re , _ ) in enumerate ( self . _special_cases ) :
if module_match in modules and src_match_re is not None :
if not self . _special_cases_matched [ i ] :
raise Exception ( ' Module ' + module_match +
' : no match for ' + self . _method + ' special case ' +
src_match_re . pattern + ' . The case needs to be updated ' )
def match ( self , source_line : str ) - > Set [ str ] :
def extract ( self , file_name , src_str , module , orb_id_vehicle_attitude_controls_topic ) :
""" Extract subscribed/published topics from a source string
""" Extract subscribed/published topics from a source string
: param src_str : string of C / C + + code with comments and whitespace removed
: param src_str : string of C / C + + code with comments and whitespace removed
: return : if any topic was found , it is returned as a str . Otherwise , None
"""
"""
orb_pubsub_matches = [ ]
matches = set ( )
for regex in self . _orb_pub_sub_regexes :
for regex in self . _regexes :
orb_pubsub_matches + = re . findall ( regex , src_str )
# just the matches for this particular pattern:
match = regex . search ( source_line )
if match is None :
continue
# # all regexes should contain 3 capture groups (or else this code block crashes)
# total_match = match.group(0)
route_group , topic_group = match . groups ( )
orb_id = ' ORB_ID( '
log . debug ( f " ####: { self . _name } : { route_group } , { topic_group } " )
for _ , match in orb_pubsub_matches :
# # TODO: handle this case... but not sure where, yet
if match == ' ORB_ID_VEHICLE_ATTITUDE_CONTROLS ' : # special case
# if match == 'ORB_ID_VEHICLE_ATTITUDE_CONTROLS': # special case
match = orb_id + orb_id_vehicle_attitude_controls_topic
# match = orb_id+ orb_id_vehicle_attitude_controls_topic
# match has the form: '[ORB_ID(]<topic_name>'
# match has the form: '[ORB_ID(]<topic_name>'
if match . startswith ( orb_id ) :
if route_group :
topic_name = match [ len ( orb_id ) : ]
if route_group == ' ORB_ID ' :
self . _add_topic ( topic_name , file_name , module )
log . debug ( " >>> Found ORB_ID topic: " + topic_group + " w/regex: " + str ( regex . pattern ) )
self . _add_topic ( matches , topic_group )
break
elif route_group == ' < ' and topic_group . endswith ( ' _s ' ) :
topic_group = topic_group [ : - 2 ]
log . debug ( " >>> Found C++ template-declaration: " + topic_group + " w/regex: " + str ( regex . pattern ) )
self . _add_topic ( matches , topic_group )
# continue processing
elif route_group in [ ' { ' , ' ( ' ] and topic_group . endswith ( ' _s ' ) :
topic_group = topic_group [ : - 2 ]
log . debug ( " >>> Found standard declaration: " + topic_group + " w/regex: " + str ( regex . pattern ) )
self . _add_topic ( matches , topic_group )
break
elif route_group == ' [ ' :
if topic_group . endswith ( ' _s ' ) :
topic_group = topic_group [ : - 2 ]
log . debug ( " >>> Found array declaration: " + topic_group + " w/regex: " + str ( regex . pattern ) )
self . _add_topic ( matches , topic_group )
break
else :
else :
ignore_found = False
# no topic found -- ambiguity -- return an empty set
for module_match , file_match_re , _ , ignore_re in self . _special_cases :
return set ( )
if module == module_match :
elif ' Multi ' in route_group and topic_group . endswith ( ' _s ' ) :
if file_match_re . search ( file_name ) :
topic_group = topic_group [ : - 2 ]
if ignore_re . search ( match ) :
log . debug ( " >>> Found ' multi ' declaration: " + topic_group + " w/regex: " + str ( regex . pattern ) )
ignore_found = True
self . _add_topic ( matches , topic_group )
if not ignore_found :
break
# If we land here, we need to add another special case
raise Exception ( self . _method + ' w/o ORB_ID(): ' + match + ' in '
+ file_name + ' ( ' + module + ' ). You need to add another special case. ' )
# handle special cases
for i , ( module_match , file_match_re , src_match_re , _ ) in enumerate ( self . _special_cases ) :
if src_match_re is None :
continue
if module == module_match :
if file_match_re . search ( file_name ) :
matches = src_match_re . findall ( src_str )
for match in matches :
# match has the form: '[ORB_ID(]<topic_name>'
if match . startswith ( orb_id ) :
topic_name = match [ len ( orb_id ) : ]
dbg_print ( ' Found ' + self . _method + ' for special case in '
+ module + ' : ' + topic_name )
self . _add_topic ( topic_name , file_name , module )
self . _special_cases_matched [ i ] = True
else :
else :
# this is not fatal, as it could be a method delaration/definition
raise SyntaxError ( ' !!! Encountered regex case: `route_group` contains unrecognized value!: ' + route_group + ' (:: ' + str ( regex . pattern ) + ' ) \n '
dbg_print ( ' Special case ' + self . _method + ' w/o ORB_ID(): '
+ " ( " + route_group + ' , ' + topic_group + " ) \n "
+ match + ' in ' + file_name + ' ( ' + module + ' ) ' )
+ " " + source_line )
def _add_topic ( self , topic_name , file_name , module ) :
else :
""" add a subscription/publication for a module
raise SyntaxError ( " !!! unhandled case: unknown-variant: " + route_group + " , " + topic_group + " ....from regex: " + str ( regex . pattern ) )
"""
return matches
def _add_topic ( self , topic_set : Set [ str ] , topic_name : str ) :
""" add topic to set, unless the topic is ignored """
if topic_name in self . _topic_blacklist :
if topic_name in self . _topic_blacklist :
dbg_print ( ' ignoring blacklisted topic ' + topic_name )
log . debug ( " XX Ignoring blacklisted topic " + topic_name )
return
return
else :
return topic_set . add ( topic_name )
if module is None :
class Publications ( PubSub ) :
if not file_name . endswith ( ' hott/messages.cpp ' ) : # hott has a special module structure. just ignore it
""" Collects topic publication information for scopes """
print ( ' Warning: found ' + self . _method + ' without associated module: '
+ topic_name + ' in ' + file_name )
return
if not module in self . _module_pubsubs :
def __init__ ( self , topic_blacklist , regexes ) :
self . _module_pubsubs [ module ] = set ( )
super ( ) . __init__ ( ' PUB ' , topic_blacklist , regexes )
self . _module_pubsubs [ module ] . add ( topic_name )
def get_topics ( self , modules ) :
class Subscriptions ( PubSub ) :
""" get the set of topics
""" Collects topic subscription information for scopes """
: param modules : list of modules to take into account
"""
def __init__ ( self , topic_blacklist , regexes ) :
topics = set ( )
super ( ) . __init__ ( ' SUB ' , topic_blacklist , regexes )
for module in modules :
if module in self . _module_pubsubs :
topics | = self . _module_pubsubs [ module ]
class Ambiguities ( PubSub ) :
return topics
""" Collects topic information that cannot be classified """
def __init__ ( self , topic_blacklist , regexes ) :
super ( ) . __init__ ( ' AMB ' , topic_blacklist , regexes )
class Scope ( object ) :
""" Defines a scope to add dependencies or topics to """
def __init__ ( self , typename , name ) :
self . publications = set ( )
self . subscriptions = set ( )
self . dependencies = set ( )
self . ambiguities = set ( )
self . _name = name
self . _typename = typename
def add_dependency ( self , dependency_name : str ) :
if isinstance ( dependency_name , str ) :
self . dependencies . add ( dependency_name )
def is_empty ( self ) :
return ( 0 == len ( self . publications ) ) and ( 0 == len ( self . subscriptions ) )
@property
def name ( self ) :
return self . _name
def reduce_ambiguities ( self ) - > Set [ str ] :
self . ambiguities = self . ambiguities - self . subscriptions - self . ambiguities
return self . dependencies
@property
@property
def pubsubs ( self ) :
def typename ( self ) :
""" get dict of all publication/subscriptions (key=modules, value=set of
return self . _typename
topic names """
return self . _module_pubsubs
# define these so we can hash these classes in dicts and sets
def __hash__ ( self ) :
return self . _name . __hash__ ( )
def __eq__ ( self , other ) :
if isinstance ( other , str ) :
return self . _name == other
else :
return self . _name == other . _name
class LibraryScope ( Scope ) :
def __init__ ( self , name ) :
super ( ) . __init__ ( ' Library ' , name )
class ModuleScope ( Scope ) :
def __init__ ( self , name ) :
super ( ) . __init__ ( ' Module ' , name )
class Graph ( object ) :
class Graph ( object ) :
""" Collects Node and Edge information by parsing the source tree """
""" Collects Node and Edge information by parsing the source tree """
def __init__ ( self , module_whitelist = [ ] , topic_blacklist = [ ] ) :
self . _current_module = [ ] # stack with current module (they can be nested)
self . _all_modules = set ( ) # set of all found modules
self . _comment_remove_pattern = re . compile (
def __init__ ( self , * * kwargs ) :
r ' //.*?$|/ \ *.*? \ */| \' (?: \\ .|[^ \\ \' ])* \' | " (?: \\ .|[^ \\ " ])* " ' ,
"""
re . DOTALL | re . MULTILINE )
: kwargs :
- scope_whitelist
- scope_blacklist
- topic_blacklist
"""
self . _comment_remove_pattern = re . compile ( r ' //.*?$|/ \ *.*? \ */| \' (?: \\ .|[^ \\ \' ])* \' | " (?: \\ .|[^ \\ " ])* " ' , re . DOTALL | re . MULTILINE )
self . _whitespace_pattern = re . compile ( r ' \ s+ ' )
self . _whitespace_pattern = re . compile ( r ' \ s+ ' )
self . _module_whitelist = module_whitelist
self . _scope_blacklist = set ( kwargs . get ( ' scope_blacklist ' , set ( ) ) )
self . _excluded_paths = [ ]
self . _scope_whitelist = set ( kwargs . get ( ' scope_whitelist ' , set ( ) ) )
self . _orb_id_vehicle_attitude_controls_topic = ' actuator_controls_0 '
self . _path_blacklist = [ ]
self . _orb_id_vehicle_attitude_controls_re = \
re . compile ( r ' \ #define \ s+ORB_ID_VEHICLE_ATTITUDE_CONTROLS \ s+([^,)]+) ' )
self . _module_subscriptions = { } # key = module name, value = set of topic names
self . _topic_blacklist = set ( kwargs . get ( ' topic_blacklist ' , set ( ) ) )
self . _module_publications = { } # key = module name, value = set of topic names
self . _modules = set ( ) # all modules
self . _orb_id_vehicle_attitude_controls_topic = ' actuator_controls_0 '
self . _topics = set ( ) # all topics
self . _orb_id_vehicle_attitude_controls_re = re . compile ( r ' \ #define \ s+ORB_ID_VEHICLE_ATTITUDE_CONTROLS \ s+([^,)]+) ' )
self . _topic_colors = { } # key = topic, value = color (html string)
# handle special cases
self . _warnings = [ ] # list off all ambiguous scan sites
# format: list of tuples with 4 entries:
# - module name to match (module MAIN)
# - regex for file name(s) to match within the module (matched against the full path)
# - regex to extract the topic name: the match must be ORB_ID(<topic_name>
# Note: whitespace is removed from source code, so it does not need to be
# accounted for in the regex.
# If this is None, it will just be ignored
# - regex to ignore matches in the form orb_[subscribe|advertise](<match>
# (the expectation is that the previous matching ORB_ID() will be passed
# to this, so that we can ignore it)
special_cases_sub = [
( ' listener ' , r ' .* ' , None , r ' ^(id)$ ' ) ,
( ' logger ' , r ' .* ' , None , r ' ^(topic|sub \ .metadata|_polling_topic_meta)$ ' ) ,
]
self . _current_scope = [ ] # stack with current module (they can be nested)
special_cases_sub = [ ( a , re . compile ( b ) , re . compile ( c ) if c is not None else None , re . compile ( d ) )
for a , b , c , d in special_cases_sub ]
self . _subscriptions = PubSub ( False , topic_blacklist ,
self . _found_modules = { } # dict of all found modules
[ r " \ borb_subscribe(_multi|) \ b \ (([^,)]+) " ] ,
self . _found_libraries = { } # dict of all found modules
special_cases_sub )
self . _print_nodes = set ( ) # combination of libraries + modules
self . _print_topics = set ( ) # all topics
self . _topic_colors = { } # key = topic, value = color (html string)
special_cases_pub = [
# note: the source-file-string is pre-processed to remove whitespace -- regexes should ignore whitespace
( ' replay ' , r ' Replay \ .cpp$ ' , None , r ' ^sub \ .orb_meta$ ' ) ,
# note: the regexes should have at least 3 capture groups '()'; otherwise they break downstream code
capture_cases_sub = [ r " \ borb_subscribe(?:_multi|) \ b \ s* \ ( \ s*(ORB_ID) \ s* \ ( \ s*( \ w+) " ,
( ' uavcan ' , r ' sensors/.* \ .cpp$ ' , None , r ' ^_orb_topic$ ' ) ,
r " (?:uORB::)Subscription(?:Interval|) \ s+ \ w+ \ s*[ \ { \ (] \ s*(ORB_ID) \ s* \ ( \ s*( \ w+) " ,
r " (?:uORB::)Subscription(?:Data|MultiArray|Blocking|) \ s*(<) \ s*( \ w+) " ,
r " (?:uORB::)SubscriptionCallbackWorkItem \ s+ \ w+ \ s* \ { \ s*this, \ s*(ORB_ID) \ (( \ w+) " ,
]
self . _subscriptions = Subscriptions ( self . _topic_blacklist , capture_cases_sub )
# note: the source-file-string is pre-processed to remove whitespace -- regexes should ignore whitespace
# note: the regexes should have at least 3 capture groups '()'; otherwise they break downstream code
capture_cases_pub = [ r " (?:uORB::)Publication(?:Data|Multi|) \ s*(<)( \ w+)> " ,
r " orb_advertise(?:_multi|_queue|_multi_queue|) \ s* \ ( \ s*(ORB_ID) \ s* \ ( \ s*( \ w+) " ,
r " orb_publish(?:_auto|) \ s* \ ( \ s*(ORB_ID) \ s* \ ( \ s*( \ w+) " ,
r " (?:uORB::)Publication(?:Data|Multi|) \ s*< \ w+> \ s+ \ w+ \ s*[ \ ( \ { ](ORB_ID) \ (( \ w+) "
]
]
special_cases_pub = [ ( a , re . compile ( b ) , re . compile ( c ) if c is not None else None , re . compile ( d ) )
self . _publications = Publications ( self . _topic_blacklist , capture_cases_pub )
for a , b , c , d in special_cases_pub ]
self . _publications = PubSub ( True , topic_blacklist ,
[ r " \ borb_advertise(_multi|_queue|_multi_queue|) \ b \ (([^,)]+) " ,
r " \ borb_publish_auto() \ b \ (([^,)]+) " ] ,
special_cases_pub )
# note: the source-file-string is pre-processed to remove whitespace -- regexes should ignore whitespace
# note: the regexes should have at least 3 capture groups '()'; otherwise they break downstream code
capture_cases_ambiguous = [ r " orb_copy \ s* \ ( \ s*(ORB_ID) \ s* \ ( \ s*( \ w+) " ,
r " (?:uORB::)Subscription \ s+ \ w+ \ s*( \ [) \ s* \ w+ \ s* \ ]() " ,
r " (ORB_ID) \ s* \ ( \ s*( \ w+) " ,
]
self . _ambiguities = Ambiguities ( self . _topic_blacklist , capture_cases_ambiguous )
def _get_current_module ( self ) :
def _get_current_scop e ( self ) :
if len ( self . _current_module ) == 0 :
if len ( self . _current_scop e ) == 0 :
return None
return None
return self . _current_module [ - 1 ]
return self . _current_scop e [ - 1 ]
def build ( self , src_path_list , excluded_paths = [ ] , use_topic_pubsub_union = True ) :
def build ( self , src_path_list , path_blackli st = [ ] , * * kwargs ) :
""" parse the source tree & extract pub/sub information.
""" parse the source tree & extract pub/sub information.
: param use_topic_pubsub_union : if true , use all topics that have a
: param use_topic_pubsub_union : if true , use all topics that have a
publisher or subscriber . If false , use only topics with at least one
publisher or subscriber . If false , use only topics with at least one
@ -259,50 +288,114 @@ class Graph(object):
fill in self . _module_subsciptions & self . _module_publications
fill in self . _module_subsciptions & self . _module_publications
"""
"""
self . _subscriptions . reset ( )
self . _path_blacklist = [ os . path . normpath ( p ) for p in path_blacklist ]
self . _publications . reset ( )
self . _excluded_paths = [ os . path . normpath ( p ) for p in excluded_paths ]
for path in src_path_list :
for path in src_path_list :
self . _build_recursive ( path )
log . info ( " ## Add src path: " + path )
self . _build_recursive ( path , * * kwargs )
# filter by whitelist
if len ( self . _module_whitelist ) > 0 :
# Summarize the found counts: (all topics are defined in 'dependency' library)
self . _subscriptions . filter_modules ( self . _module_whitelist )
log . info ( ' ### Summary: Total Scanned: ' )
self . _publications . filter_modules ( self . _module_whitelist )
log . info ( ' Libraries Count: ' + str ( len ( self . _found_libraries ) ) )
log . info ( ' Modules Count: ' + str ( len ( self . _found_modules ) ) )
# modules & topics sets
log . info ( ' Warnings Count: ' + str ( len ( self . _warnings ) ) )
self . _modules = set ( list ( self . _publications . pubsubs . keys ( ) ) +
list ( self . _subscriptions . pubsubs . keys ( ) ) )
if kwargs [ ' merge_depends ' ] :
print ( ' number of modules: ' + str ( len ( self . _modules ) ) )
graph . merge_depends ( )
self . _topics = self . _get_topics ( use_topic_pubsub_union = use_topic_pubsub_union )
print ( ' number of topics: ' + str ( len ( self . _topics ) ) )
# filter all scopes, topics into only the scopes + topics to output
self . _generate_print_lists ( use_topic_pubsub_union = kwargs [ ' use_topic_pubsub_union ' ] , merge_depends = kwargs [ ' merge_depends ' ] )
# Summarize the found counts:
print ( ' ### Summary (in-scope): ' )
print ( ' Ambiguous Count: ' + str ( len ( self . _print_ambiguities ) ) )
print ( ' Scope Count: ' + str ( len ( self . _print_scopes ) ) )
print ( ' Topics Count: ' + str ( len ( self . _print_topics ) ) )
print ( ' Warnings Count: ' + str ( len ( self . _warnings ) ) )
if 0 < len ( self . _warnings ) :
# print out the list of warning-sites:
log . info ( ' ## Warning Sites: ' )
for w in self . _warnings :
# warnings tuple contains: (current_scope, file_name, line_number, line)
log . info ( " -[ ' {} ' ]: {:<64s} : {} = {} " . format ( w [ 0 ] . name , w [ 1 ] . lstrip ( ' /. ' ) , w [ 2 ] , w [ 3 ] ) )
# initialize colors
# initialize colors
color_list = get_N_colors ( len ( self . _topics ) , 0.7 , 0.85 )
color_list = get_N_colors ( len ( self . _print_ topics ) , 0.7 , 0.85 )
self . _topic_colors = { }
self . _topic_colors = { }
for i , topic in enumerate ( self . _topics ) :
for i , topic in enumerate ( self . _print_ topics ) :
self . _topic_colors [ topic ] = color_list [ i ]
self . _topic_colors [ topic ] = color_list [ i ]
# validate that all special rules got used
def _generate_print_lists ( self , use_topic_pubsub_union , merge_depends ) :
self . _subscriptions . check_if_match_found ( self . _all_modules )
""" generate the set of scopes (modules + libraries) and topics to print to output """
self . _publications . check_if_match_found ( self . _all_modules )
subscribed_topics = set ( )
published_topics = set ( )
ambiguous_topics = set ( )
def _get_topics ( self , use_topic_pubsub_union = True ) :
# gather all possible modules...
""" get the set of topics
# all_scopes = self._found_libraries | self._found_modules # Python 3.9 or greater
"""
all_scopes = { * * self . _found_libraries , * * self . _found_modules } # Python 3.5 or greater
subscribed_topics = self . _subscriptions . get_topics ( self . _modules )
published_topics = self . _publications . get_topics ( self . _modules )
if use_topic_pubsub_union :
return subscribed_topics | published_topics
return subscribed_topics & published_topics
def _build_recursive ( self , path ) :
if os . path . normpath ( path ) in self . _excluded_paths :
if 0 == len ( self . _scope_whitelist ) :
dbg_print ( ' ignoring excluded path ' + path )
select_scopes = self . _found_modules
else :
select_scopes = { }
for scope_name in self . _scope_whitelist :
if scope_name in all_scopes :
select_scopes [ scope_name ] = all_scopes [ scope_name ]
if not isinstance ( select_scopes , dict ) or 0 == len ( select_scopes ) :
raise TypeError ( " ' select_scopes ' should be a set!! aborting. " )
log . debug ( f ' >> Condensing found topics: scope -> total ' )
for name , scope in select_scopes . items ( ) :
log . debug ( f ' @@ Scope: { name } ' )
log . debug ( f ' ## Subs: { name } ' )
for topic in scope . subscriptions :
log . debug ( f ' - { topic } ' )
subscribed_topics . add ( topic )
log . debug ( f ' ## Pubs: { name } ' )
for topic in scope . publications :
log . debug ( f ' - { topic } ' )
published_topics . add ( topic )
scope . reduce_ambiguities ( )
log . debug ( f ' ## Ambiguities: { name } ' )
for topic in scope . ambiguities :
log . debug ( f ' - { topic } ' )
ambiguous_topics . add ( topic )
# filter modules iff they have at least a subscription or a publication
scopes_with_topic = { }
for name , scope in select_scopes . items ( ) :
if not scope . is_empty ( ) :
scopes_with_topic [ name ] = scope
self . _print_ambiguities = ambiguous_topics
if use_topic_pubsub_union :
self . _print_topics = subscribed_topics | published_topics
self . _print_scopes = scopes_with_topic
else :
self . _print_topics = subscribed_topics & published_topics
# cull scopes to only those that pub or sub to a topic that has both
intersect_scopes = { }
for name , scope in scopes_with_topic . items ( ) :
all_scope_topics = scope . publications | scope . subscriptions
for topic in all_scope_topics :
if topic in self . _print_topics :
intersect_scopes [ scope . name ] = scope
break
self . _print_scopes = intersect_scopes
def _build_recursive ( self , path , * * kwargs ) :
if os . path . normpath ( path ) in self . _path_blacklist :
log . debug ( ' ignoring excluded path ' + path )
return
return
entries = os . listdir ( path )
entries = os . listdir ( path )
@ -311,19 +404,18 @@ class Graph(object):
cmake_file = ' CMakeLists.txt '
cmake_file = ' CMakeLists.txt '
new_module = False
new_module = False
if cmake_file in entries :
if cmake_file in entries :
new_module = self . _extract_module_name ( os . path . join ( path , cmake_file ) )
new_module = self . _extract_build_information ( os . path . join ( path , cmake_file ) , * * kwargs )
# iterate directories recursively
# iterate directories recursively
for entry in entries :
for entry in entries :
file_name = os . path . join ( path , entry )
file_name = os . path . join ( path , entry )
if os . path . isdir ( file_name ) :
if os . path . isdir ( file_name ) :
self . _build_recursive ( file_name )
self . _build_recursive ( file_name , * * kwargs )
# iterate source files
# iterate source files
# Note: we could skip the entries if we're not in a module, but we don't
# Note: Skip all entries if we're not in a module -- both finding known pubs/subs and emitting warnings
# so that we get appropriate error messages to know where we miss subs
if ( 0 == len ( self . _scope_whitelist ) ) or ( 0 < len ( self . _current_scope ) ) and ( self . _current_scope [ - 1 ] . name in self . _scope_whitelist ) :
# or pubs
for entry in entries :
for entry in entries :
file_name = os . path . join ( path , entry )
file_name = os . path . join ( path , entry )
if os . path . isfile ( file_name ) :
if os . path . isfile ( file_name ) :
@ -333,31 +425,68 @@ class Graph(object):
if new_module :
if new_module :
self . _current_modul e . pop ( )
self . _current_scop e . pop ( )
def _extract_module_name ( self , file_name ) :
def _extract_build_information ( self , file_name , * * kwargs ) :
""" extract the module name from a CMakeLists.txt file and store
""" extract the module name from a CMakeLists.txt file and store
in self . _current_module if there is any """
in self . _current_scope if there is any """
datafile = open ( file_name )
datafile = open ( file_name )
found_module_def = False
found_module_def = False
found_module_depends = False
found_library_def = False
for line in datafile :
for line in datafile :
if ' px4_add_module ' in line : # must contain 'px4_add_module'
if ' px4_add_module ' in line : # must contain 'px4_add_module'
found_module_def = True
found_module_def = True
elif ' px4_add_library ' in line : # must contain 'px4_add_module'
tokens = line . split ( ' ( ' )
if 1 < len ( tokens ) :
found_library_def = True
library_name = tokens [ 1 ] . split ( ) [ 0 ] . strip ( ) . rstrip ( ' ) ' )
library_scope = LibraryScope ( library_name )
self . _current_scope . append ( library_scope )
self . _found_libraries [ library_name ] = library_scope
if self . _in_scope ( ) :
log . debug ( ' >> found library: ' + library_name )
# we can return early because we have no further information to collect from libraries
return True
elif found_module_def and ' DEPENDS ' in line . upper ( ) :
found_module_depends = True
elif found_module_depends :
# two tabs is a *sketchy* heuristic -- spacing isn't guaranteed by cmake;
# ... but the hard-tabs *is* specified by PX4 coding standards, so it's likely to be consistent
if line . startswith ( ' \t \t ' ) :
depends = [ dep . strip ( ) for dep in line . split ( ) ]
for name in depends :
self . _current_scope [ - 1 ] . add_dependency ( name )
if kwargs [ ' merge_depends ' ] :
if ( 0 < len ( self . _scope_whitelist ) ) and self . _current_scope [ - 1 ] . name in self . _scope_whitelist :
# if we whitelist a module with dependencies, whitelist the dependencies, too
self . _scope_whitelist . add ( name )
else :
found_module_depends = False ## done with the 'DEPENDS' section.
words = line . split ( )
words = line . split ( )
# get the definition of MAIN
# get the definition of MAIN
if found_module_def and ' MAIN ' in words and len ( words ) > = 2 :
if found_module_def and ' MAIN ' in words and len ( words ) > = 2 :
self . _current_module . append ( words [ 1 ] )
module_name = words [ 1 ]
self . _all_modules . add ( words [ 1 ] )
module_scope = ModuleScope ( module_name )
dbg_print ( ' Found module name: ' + words [ 1 ] )
self . _current_scope . append ( module_scope )
return True
self . _found_modules [ module_name ] = module_scope
return False
if self . _in_scope ( ) :
log . debug ( ' >> Found module name: ' + module_scope . name )
return ( found_library_def or found_module_def )
def _process_source_file ( self , file_name ) :
def _process_source_file ( self , file_name ) :
""" extract information from a single source file """
""" extract information from a single source file """
log . debug ( " >> extracting topics from file: " + file_name )
with codecs . open ( file_name , ' r ' , ' utf-8 ' ) as f :
with codecs . open ( file_name , ' r ' , ' utf-8 ' ) as f :
try :
try :
content = f . read ( )
content = f . read ( )
@ -366,10 +495,15 @@ class Graph(object):
return
return
current_module = self . _get_current_module ( )
current_scope = self . _get_current_scope ( )
if current_module == ' uorb_tests ' : # skip this
if current_scope is None :
return # ignore declarations outside of a declared module
elif current_scope . name in self . _scope_blacklist :
return
return
if current_module == ' uorb ' :
elif current_scope . name == ' uorb_tests ' : # skip this
return
elif current_scope . name == ' uorb ' :
# search and validate the ORB_ID_VEHICLE_ATTITUDE_CONTROLS define
# search and validate the ORB_ID_VEHICLE_ATTITUDE_CONTROLS define
matches = self . _orb_id_vehicle_attitude_controls_re . findall ( content )
matches = self . _orb_id_vehicle_attitude_controls_re . findall ( content )
@ -383,66 +517,71 @@ class Graph(object):
return # skip uorb module for the rest
return # skip uorb module for the rest
line_number = 0
for line in content . splitlines ( ) :
line_number + = 1
pub_topics = self . _publications . match ( line )
for each_topic in pub_topics :
current_scope . publications . add ( each_topic )
if pub_topics :
continue
if content . lower ( ) . find ( ' orb_ ' ) != - 1 : # approximative filter to quickly
sub_topics = self . _subscriptions . match ( line )
# discard files we're not interested in
for each_topic in sub_topics :
# (speedup the parsing)
current_scope . subscriptions . add ( each_topic )
src = self . _comment_remover ( content )
if sub_topics :
src = re . sub ( self . _whitespace_pattern , ' ' , src ) # remove all whitespace
continue
# subscriptions
self . _subscriptions . extract ( file_name , src , current_module ,
self . _orb_id_vehicle_attitude_controls_topic )
# publications
ambi_topics = self . _ambiguities . match ( line )
self . _publications . extract ( file_name , src , current_module ,
for each_topic in ambi_topics :
self . _orb_id_vehicle_attitude_controls_topic )
current_scope . ambiguities . add ( each_topic )
self . _warnings . append ( ( current_scope , file_name , line_number , line ) )
# TODO: handle Publication & Subscription template classes
def _in_scope ( self , scope_name = None ) :
if 0 == len ( self . _scope_whitelist ) :
return True
elif 0 < len ( self . _current_scope ) :
if None is scope_name :
scope_name = self . _current_scope [ - 1 ] . name
if scope_name in self . _scope_whitelist :
return True
return False
def merge_depends ( self ) :
for modname , module in self . _found_modules . items ( ) :
if self . _in_scope ( modname ) :
for depname in module . dependencies :
if depname in self . _found_libraries :
dep = self . _found_libraries [ depname ]
# copy topics from library to depending library
for topic in dep . publications :
module . publications . add ( topic )
for topic in dep . subscriptions :
module . subscriptions . add ( topic )
for topic in dep . ambiguities :
module . ambiguities
def _comment_remover ( self , text ) :
# omit all libraries -- they've already been merged into their respective dependees
""" remove C++ & C style comments.
self . _scope_whitelist = set ( [ str ( s ) for s in self . _scope_whitelist if s not in self . _found_libraries ] )
Source : https : / / stackoverflow . com / a / 241506 """
def replacer ( match ) :
s = match . group ( 0 )
if s . startswith ( ' / ' ) :
return " " # note: a space and not an empty string
else :
return s
return re . sub ( self . _comment_remove_pattern , replacer , text )
@property
@property
def modul es( self ) :
def output_scopes ( self ) :
""" get the set of all modules """
""" get the set of all modules """
return self . _modul es
return self . _print_scopes
@property
@property
def topics ( self ) :
def output_ topics( self ) :
""" get set set of all topics """
""" get set set of all topics """
return self . _topics
return self . _print_ topics
@property
@property
def topic_colors ( self ) :
def topic_colors ( self ) :
""" get a dict of all topic colors with key=topic, value=color """
""" get a dict of all topic colors with key=topic, value=color """
return self . _topic_colors
return self . _topic_colors
@property
def module_subscriptions ( self ) :
""" get a dict of all subscriptions with key=module name, value=set(topic names) """
return self . _subscriptions . pubsubs
@property
def module_publications ( self ) :
""" get a dict of all publications with key=module name, value=set(topic names) """
return self . _publications . pubsubs
class OutputGraphviz ( object ) :
class OutputGraphviz ( object ) :
""" write graph using Graphviz """
""" write graph using Graphviz """
@ -463,44 +602,40 @@ class OutputGraphviz(object):
ratio = 1 # aspect ratio
ratio = 1 # aspect ratio
module s = self . _graph . module s
output_topic s = self . _graph . output_topic s
topic s = self . _graph . topic s
output_scope s = self . _graph . output_scope s
topic_colors = self . _graph . topic_colors
topic_colors = self . _graph . topic_colors
module_publications = self . _graph . module_publications
module_subscriptions = self . _graph . module_subscriptions
graph_attr = { ' splines ' : ' true ' , ' ratio ' : str ( ratio ) , ' overlap ' : ' false ' }
graph_attr = { ' splines ' : ' true ' , ' ratio ' : str ( ratio ) , ' overlap ' : ' false ' }
graph_attr [ ' sep ' ] = ' " +15,15 " ' # increase spacing between nodes
graph_attr [ ' sep ' ] = ' " +15,15 " ' # increase spacing between nodes
graph = Digraph ( comment = ' autogenerated graph with graphviz using uorb_graph.py ' ,
graph = Digraph ( comment = ' autogenerated graph with graphviz using uorb_graph.py ' ,
engine = engine , graph_attr = graph_attr )
engine = engine , graph_attr = graph_attr )
# scopes: modules
# nodes
log . info ( ' > Writing scopes ' )
for modul e in m od ul es:
for na me, _ in output_scopes . it em s ( ) :
graph . node ( ' m_ ' + modul e , modul e , shape = ' box ' , fontcolor = ' #ffffff ' ,
graph . node ( ' m_ ' + na me, na me, shape = ' box ' , fontcolor = ' #ffffff ' ,
style = ' filled ' , color = ' #666666 ' , fontsize = ' 16 ' )
style = ' filled ' , color = ' #666666 ' , fontsize = ' 16 ' )
for topic in topics :
log . info ( ' > Writing topics ' )
for topic in output_topics :
graph . node ( ' t_ ' + topic , topic , shape = ' ellipse ' , fontcolor = ' #ffffff ' ,
graph . node ( ' t_ ' + topic , topic , shape = ' ellipse ' , fontcolor = ' #ffffff ' ,
style = ' filled ' , color = topic_colors [ topic ] )
style = ' filled ' , color = topic_colors [ topic ] )
# edges
# edges
log . info ( ' > Writing publish edges ' )
if show_publications :
if show_publications :
for module in modules :
for scope_name , scope in output_scopes . items ( ) :
if module in module_publications :
for topic in scope . publications :
for topic in module_publications [ module ] :
if topic in output_topics :
if topic in topics :
graph . edge ( ' m_ ' + scope_name , ' t_ ' + topic , color = topic_colors [ topic ] , style = ' dashed ' )
graph . edge ( ' m_ ' + module , ' t_ ' + topic ,
color = topic_colors [ topic ] , style = ' dashed ' )
log . info ( ' > Writing subscribe edges ' )
if show_subscriptions :
if show_subscriptions :
for module in modules :
for scope_name , scope in output_scopes . items ( ) :
if module in module_subscriptions :
for topic in scope . subscriptions :
for topic in module_subscriptions [ module ] :
if topic in output_topics :
if topic in topics :
graph . edge ( ' t_ ' + topic , ' m_ ' + scope_name , color = topic_colors [ topic ] )
graph . edge ( ' t_ ' + topic , ' m_ ' + module ,
color = topic_colors [ topic ] )
graph . render ( file_name , view = False )
graph . render ( file_name , view = False )
@ -515,11 +650,9 @@ class OutputJSON(object):
print ( ' Writing to ' + file_name )
print ( ' Writing to ' + file_name )
module s = self . _graph . module s
output_topic s = self . _graph . output_topic s
topic s = self . _graph . topic s
output_scope s = self . _graph . output_scope s
topic_colors = self . _graph . topic_colors
topic_colors = self . _graph . topic_colors
module_publications = self . _graph . module_publications
module_subscriptions = self . _graph . module_subscriptions
data = { }
data = { }
nodes = [ ]
nodes = [ ]
@ -528,16 +661,16 @@ class OutputJSON(object):
# (sort by length, such that short names are last. The rendering order
# (sort by length, such that short names are last. The rendering order
# will be the same, so that in case of an overlap, the shorter label
# will be the same, so that in case of an overlap, the shorter label
# will be on top)
# will be on top)
for modu le in sorted ( m od ul es, key = len , reverse = True ) :
for scope_tup le in sorted ( output_scopes . it em s ( ) , key = ( lambda st : len ( st [ 0 ] ) ) , reverse = True ) :
node = { }
node = { }
node [ ' id ' ] = ' m_ ' + module
node [ ' id ' ] = ' m_ ' + scope_tuple [ 0 ]
node [ ' name ' ] = module
node [ ' name ' ] = scope_tuple [ 0 ]
node [ ' type ' ] = ' module '
node [ ' type ' ] = scope_tuple [ 1 ] . typename
node [ ' color ' ] = ' #666666 '
node [ ' color ' ] = ' #666666 '
# TODO: add url to open module documentation?
# TODO: add url to open module documentation?
nodes . append ( node )
nodes . append ( node )
for topic in sorted ( topics , key = len , reverse = True ) :
for topic in sorted ( output_ topics, key = len , reverse = True ) :
node = { }
node = { }
node [ ' id ' ] = ' t_ ' + topic
node [ ' id ' ] = ' t_ ' + topic
node [ ' name ' ] = topic
node [ ' name ' ] = topic
@ -552,25 +685,24 @@ class OutputJSON(object):
edges = [ ]
edges = [ ]
# edges
# edges
for module in modules :
for name , scope in output_scopes . items ( ) :
if module in module_publications :
for topic in scope . publications :
for topic in module_publications [ module ] :
if topic in output_topics :
if topic in topics :
edge = { }
edge = { }
edge [ ' source ' ] = ' m_ ' + modul e
edge [ ' source ' ] = ' m_ ' + na me
edge [ ' target ' ] = ' t_ ' + topic
edge [ ' target ' ] = ' t_ ' + topic
edge [ ' color ' ] = topic_colors [ topic ]
edge [ ' color ' ] = topic_colors [ topic ]
edge [ ' style ' ] = ' dashed '
edge [ ' style ' ] = ' dashed '
edges . append ( edge )
edges . append ( edge )
for module in modules :
for name , scope in output_scopes . items ( ) :
if module in module_subscriptions :
for topic in scope . subscriptions :
for topic in module_subscriptions [ module ] :
if topic in output_topics :
if topic in topics :
edge = { }
edge = { }
edge [ ' source ' ] = ' t_ ' + topic
edge [ ' source ' ] = ' t_ ' + topic
edge [ ' target ' ] = ' m_ ' + modul e
edge [ ' target ' ] = ' m_ ' + na me
edge [ ' color ' ] = topic_colors [ topic ]
edge [ ' color ' ] = topic_colors [ topic ]
edge [ ' style ' ] = ' normal '
edge [ ' style ' ] = ' normal '
edges . append ( edge )
edges . append ( edge )
@ -583,21 +715,48 @@ class OutputJSON(object):
if " __main__ " == __name__ :
if " __main__ " == __name__ :
args = parser . parse_args ( )
if 0 < args . verbosity :
if 1 == args . verbosity :
log . setLevel ( logging . INFO )
print ( " set log level to INFO " )
else : # implicity 1<
log . setLevel ( logging . DEBUG )
print ( " set log level to DEBUG " )
# ignore topics that are subscribed/published by many topics, but are not really
# ignore topics that are subscribed/published by many topics, but are not really
# useful to show in the graph
# useful to show in the graph
topic_blacklist = [ ' parameter_update ' , ' mavlink_log ' , ' log_message ' ]
topic_blacklist = [ ' parameter_update ' , ' mavlink_log ' , ' log_message ' ]
print ( ' Excluded topics: ' + str ( topic_blacklist ) )
print ( ' Excluded topics: ' + str ( topic_blacklist ) )
# ignore certain modules; for any reason
scope_blacklist = [ ]
if scope_blacklist :
print ( ' Excluded Modules: ' + str ( topic_blacklist ) )
if len ( args . modules ) == 0 :
if len ( args . modules ) == 0 :
module_whitelist = [ ]
scop e_whitelist = [ ]
else :
else :
module_whitelist = [ m . strip ( ) for m in args . modules . split ( ' , ' ) ]
scope_whitelist = [ m . strip ( ) for m in args . modules . split ( ' , ' ) ]
scope_whitelist = set ( scope_whitelist )
graph = Graph ( module_whitelist = module_white list, topic_blacklist = topic_blacklist )
graph = Graph ( scope_whitelist = scope_whitelist , scope_blacklist = scope_black list, topic_blacklist = topic_blacklist )
if len ( args . src_path ) == 0 :
if len ( args . src_path ) == 0 :
args . src_path = [ ' src ' ]
args . src_path = [ ' src ' ]
graph . build ( args . src_path , args . exclude_path , use_topic_pubsub_union = args . use_topic_union )
if not os . path . exists ( args . src_path [ 0 ] ) :
print ( f " !?could not find source directory: { args . src_path [ 0 ] } " )
script_path = os . path . dirname ( os . path . realpath ( __file__ ) )
args . src_path [ 0 ] = os . path . realpath ( os . path . join ( script_path , ' .. ' , ' .. ' , ' src ' ) )
print ( f " >> guessing at path: { args . src_path [ 0 ] } " )
for path in args . src_path :
if not os . path . exists ( path ) :
print ( f " !?could not find source directory: { path } -- please check path! " )
print ( f " Exiting. " )
graph . build ( args . src_path , args . exclude_path , use_topic_pubsub_union = args . use_topic_union , merge_depends = args . merge_depends )
if args . output == ' json ' :
if args . output == ' json ' :
output_json = OutputJSON ( graph )
output_json = OutputJSON ( graph )
@ -607,7 +766,7 @@ if "__main__" == __name__:
try :
try :
from graphviz import Digraph
from graphviz import Digraph
except ImportError as e :
except ImportError as e :
print ( " Failed to import graphviz: " + e )
print ( " Failed to import graphviz: " + str ( e ) )
print ( " " )
print ( " " )
print ( " You may need to install it with: " )
print ( " You may need to install it with: " )
print ( " pip3 install --user graphviz " )
print ( " pip3 install --user graphviz " )
@ -618,5 +777,7 @@ if "__main__" == __name__:
output_graphviz . write ( args . file + ' .fv ' , engine = engine )
output_graphviz . write ( args . file + ' .fv ' , engine = engine )
output_graphviz . write ( args . file + ' _subs.fv ' , show_publications = False , engine = engine )
output_graphviz . write ( args . file + ' _subs.fv ' , show_publications = False , engine = engine )
output_graphviz . write ( args . file + ' _pubs.fv ' , show_subscriptions = False , engine = engine )
output_graphviz . write ( args . file + ' _pubs.fv ' , show_subscriptions = False , engine = engine )
elif args . output == ' none ' :
pass
else :
else :
print ( ' Error: unknown output format ' + args . output )
print ( ' Error: unknown output format ' + args . output )