You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
298 lines
12 KiB
298 lines
12 KiB
# Software License Agreement (BSD License) |
|
# |
|
# Copyright (c) 2009, Willow Garage, Inc. |
|
# All rights reserved. |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions |
|
# are met: |
|
# |
|
# * Redistributions of source code must retain the above copyright |
|
# notice, this list of conditions and the following disclaimer. |
|
# * Redistributions in binary form must reproduce the above |
|
# copyright notice, this list of conditions and the following |
|
# disclaimer in the documentation and/or other materials provided |
|
# with the distribution. |
|
# * Neither the name of Willow Garage, Inc. nor the names of its |
|
# contributors may be used to endorse or promote products derived |
|
# from this software without specific prior written permission. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS |
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE |
|
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, |
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
# POSSIBILITY OF SUCH DAMAGE. |
|
|
|
import os |
|
import sys |
|
|
|
import random |
|
|
|
def test_bare_msg_type(): |
|
import genmsg.msgs |
|
tests = [(None, None), ('String', 'String'), ('std_msgs/String', 'std_msgs/String'), |
|
('String[10]', 'String'), ('string[10]', 'string'), ('std_msgs/String[10]', 'std_msgs/String'), |
|
] |
|
for val, res in tests: |
|
assert res == genmsg.msgs.bare_msg_type(val) |
|
|
|
PKG = 'genmsg' |
|
|
|
def test_resolve_type(): |
|
from genmsg.msgs import resolve_type, bare_msg_type |
|
for t in ['string', 'string[]', 'string[14]', 'int32', 'int32[]']: |
|
bt = bare_msg_type(t) |
|
t == resolve_type(t, PKG) |
|
|
|
assert 'foo/string' == resolve_type('foo/string', PKG) |
|
assert 'std_msgs/Header' == resolve_type('Header', 'roslib') |
|
assert 'std_msgs/Header' == resolve_type('std_msgs/Header', 'roslib') |
|
assert 'std_msgs/Header' == resolve_type('Header', 'stereo_msgs') |
|
assert 'std_msgs/String' == resolve_type('String', 'std_msgs') |
|
assert 'std_msgs/String' == resolve_type('std_msgs/String', 'std_msgs') |
|
assert 'std_msgs/String' == resolve_type('std_msgs/String', PKG) |
|
assert 'std_msgs/String[]' == resolve_type('std_msgs/String[]', PKG) |
|
|
|
def test_parse_type(): |
|
import genmsg.msgs |
|
tests = [ |
|
('a', ('a', False, None)), |
|
('int8', ('int8', False, None)), |
|
('std_msgs/String', ('std_msgs/String', False, None)), |
|
('a[]', ('a', True, None)), |
|
('int8[]', ('int8', True, None)), |
|
('std_msgs/String[]', ('std_msgs/String', True, None)), |
|
('a[1]', ('a', True, 1)), |
|
('int8[1]', ('int8', True, 1)), |
|
('std_msgs/String[1]', ('std_msgs/String', True, 1)), |
|
('a[11]', ('a', True, 11)), |
|
('int8[11]', ('int8', True, 11)), |
|
('std_msgs/String[11]', ('std_msgs/String', True, 11)), |
|
] |
|
for val, res in tests: |
|
assert res == genmsg.msgs.parse_type(val) |
|
|
|
fail = ['a[1][2]', 'a[][]', '', None, 'a[', 'a[[1]', 'a[1]]'] |
|
for f in fail: |
|
try: |
|
genmsg.msgs.parse_type(f) |
|
assert False, "should have failed on %s"%f |
|
except ValueError as e: |
|
pass |
|
|
|
def test_Constant(): |
|
import genmsg.msgs |
|
vals = [random.randint(0, 1000) for i in range(0, 3)] |
|
type_, name, val = [str(x) for x in vals] |
|
x = genmsg.msgs.Constant(type_, name, val, str(val)) |
|
assert type_ == x.type |
|
assert name == x.name |
|
assert val == x.val |
|
assert x == genmsg.msgs.Constant(type_, name, val, str(val)) |
|
|
|
assert x != 1 |
|
assert not x == 1 |
|
assert x != genmsg.msgs.Constant('baz', name, val, str(val)) |
|
assert x != genmsg.msgs.Constant(type_, 'foo', val, str(val)) |
|
assert x != genmsg.msgs.Constant(type_, name, 'foo', 'foo') |
|
|
|
# tripwire |
|
assert repr(x) |
|
assert str(x) |
|
|
|
try: |
|
genmsg.msgs.Constant(None, name, val, str(val)) |
|
assert False, "should have raised" |
|
except: pass |
|
try: |
|
genmsg.msgs.Constant(type_, None, val, str(val)) |
|
assert False, "should have raised" |
|
except: pass |
|
try: |
|
genmsg.msgs.Constant(type_, name, None, 'None') |
|
assert False, "should have raised" |
|
except: pass |
|
try: |
|
genmsg.msgs.Constant(type_, name, val, None) |
|
assert False, "should have raised" |
|
except: pass |
|
|
|
try: |
|
x.foo = 'bar' |
|
assert False, 'Constant should not allow arbitrary attr assignment' |
|
except: pass |
|
|
|
def test_MsgSpec(): |
|
def sub_test_MsgSpec(types, names, constants, text, full_name, has_header): |
|
m = MsgSpec(types, names, constants, text, full_name) |
|
assert m.types == types |
|
assert m.names == names |
|
assert m.text == text |
|
assert has_header == m.has_header() |
|
assert m.constants == constants |
|
assert list(zip(types, names)) == m.fields() |
|
assert m == MsgSpec(types, names, constants, text, full_name) |
|
return m |
|
|
|
from genmsg import MsgSpec, InvalidMsgSpec |
|
from genmsg.msgs import Field |
|
|
|
# don't allow duplicate fields |
|
try: |
|
MsgSpec(['int32', 'int64'], ['x', 'x'], [], 'int32 x\nint64 x', 'x/DupFields') |
|
assert False, "should have raised" |
|
except InvalidMsgSpec: |
|
pass |
|
# don't allow invalid fields |
|
try: |
|
MsgSpec(['string['], ['x'], [], 'int32 x\nint64 x', 'x/InvalidFields') |
|
assert False, "should have raised" |
|
except InvalidMsgSpec: |
|
pass |
|
|
|
# allow empty msg |
|
empty = sub_test_MsgSpec([], [], [], '', 'x/Nothing', False) |
|
assert [] == empty.fields() |
|
assert [] == empty.parsed_fields() |
|
assert 'x/Nothing' == empty.full_name |
|
assert 'x' == empty.package |
|
assert 'Nothing' == empty.short_name |
|
|
|
# one-field |
|
one_field = sub_test_MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneInt', False) |
|
# make sure that equals tests every declared field |
|
assert one_field == MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneInt') |
|
assert one_field != MsgSpec(['uint32'], ['x'], [], 'int32 x', 'x/OneInt') |
|
assert one_field != MsgSpec(['int32'], ['y'], [], 'int32 x', 'x/OneInt') |
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'uint32 x', 'x/OneInt') |
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneIntBad') |
|
# test against __ne__ as well |
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'uint32 x', 'x/OneInt') |
|
assert [Field('x', 'int32')] == one_field.parsed_fields(), "%s vs %s"%([Field('x', 'int32')], one_field.parsed_fields()) |
|
#test str |
|
assert "int32 x" == str(one_field).strip() |
|
|
|
# test variations of multiple fields and headers |
|
two_fields = sub_test_MsgSpec(['int32', 'string'], ['x', 'str'], [], 'int32 x\nstring str', 'x/TwoFields', False) |
|
assert [Field('x', 'int32'), Field('str', 'string')] == two_fields.parsed_fields() |
|
|
|
one_header = sub_test_MsgSpec(['std_msgs/Header'], ['header'], [], 'Header header', 'x/OneHeader', True) |
|
header_and_fields = sub_test_MsgSpec(['std_msgs/Header', 'int32', 'string'], ['header', 'x', 'str'], [], 'Header header\nint32 x\nstring str', 'x/HeaderAndFields', True) |
|
embed_types = sub_test_MsgSpec(['std_msgs/Header', 'std_msgs/Int32', 'string'], ['header', 'x', 'str'], [], 'Header header\nstd_msgs/Int32 x\nstring str', 'x/EmbedTypes', True) |
|
#test strify |
|
assert "int32 x\nstring str" == str(two_fields).strip() |
|
|
|
# types and names mismatch |
|
try: |
|
MsgSpec(['int32', 'int32'], ['intval'], [], 'int32 intval\int32 y') |
|
assert False, "types and names must align" |
|
except: pass |
|
|
|
# test (not) equals against non msgspec |
|
assert not (one_field == 1) |
|
assert one_field != 1 |
|
|
|
# test constants |
|
from genmsg.msgs import Constant |
|
msgspec = MsgSpec(['int32'], ['x'], [Constant('int8', 'c', 1, '1')], 'int8 c=1\nuint32 x', 'x/Constants') |
|
assert msgspec.constants == [Constant('int8', 'c', 1, '1')] |
|
# tripwire |
|
str(msgspec) |
|
repr(msgspec) |
|
|
|
# test that repr doesn't throw an error |
|
[repr(x) for x in [empty, one_field, one_header, two_fields, embed_types]] |
|
|
|
def test_Field(): |
|
from genmsg.msgs import Field |
|
|
|
field = Field('foo', 'string') |
|
assert field == Field('foo', 'string') |
|
assert field != Field('bar', 'string') |
|
assert field != Field('foo', 'int32') |
|
assert field != 1 |
|
assert not field == 1 |
|
|
|
assert field.name == 'foo' |
|
assert field.type == 'string' |
|
assert field.base_type == 'string' |
|
assert field.is_array == False |
|
assert field.array_len == None |
|
assert field.is_header == False |
|
assert field.is_builtin == True |
|
|
|
field = Field('foo', 'std_msgs/String') |
|
assert field.type == 'std_msgs/String' |
|
assert field.base_type == 'std_msgs/String' |
|
assert field.is_array == False |
|
assert field.array_len == None |
|
assert field.is_header == False |
|
assert field.is_builtin == False |
|
|
|
field = Field('foo', 'std_msgs/String[5]') |
|
assert field.type == 'std_msgs/String[5]' |
|
assert field.base_type == 'std_msgs/String' |
|
assert field.is_array == True |
|
assert field.array_len == 5 |
|
assert field.is_header == False |
|
assert field.is_builtin == False |
|
|
|
field = Field('foo', 'std_msgs/String[]') |
|
assert field.type == 'std_msgs/String[]' |
|
assert field.base_type == 'std_msgs/String' |
|
assert field.is_array == True |
|
assert field.array_len == None |
|
assert field.is_header == False |
|
assert field.is_builtin == False |
|
|
|
field = Field('foo', 'std_msgs/Header') |
|
assert field.type == 'std_msgs/Header' |
|
assert field.is_header == True |
|
assert field.is_builtin == False |
|
|
|
field = Field('foo', 'std_msgs/Header[]') |
|
assert field.type == 'std_msgs/Header[]' |
|
assert field.is_header == False |
|
|
|
#tripwire |
|
repr(field) |
|
|
|
def test_is_valid_msg_type(): |
|
import genmsg.msgs |
|
vals = [ |
|
#basic |
|
'F', 'f', 'Foo', 'Foo1', |
|
'std_msgs/String', |
|
# arrays |
|
'Foo[]', 'Foo[1]', 'Foo[10]', |
|
] |
|
for v in vals: |
|
assert genmsg.msgs.is_valid_msg_type(v), "genmsg.msgs.is_valid_msg_type should have returned True for '%s'"%v |
|
|
|
# bad cases |
|
vals = [None, '', '#', '%', 'Foo%', 'Woo Woo', |
|
'/', '/String', |
|
'Foo[f]', 'Foo[1d]', 'Foo[-1]', 'Foo[1:10]', 'Foo[', 'Foo]', 'Foo[]Bar'] |
|
for v in vals: |
|
assert not genmsg.msgs.is_valid_msg_type(v), "genmsg.msgs.is_valid_msg_type should have returned False for '%s'"%v |
|
|
|
def test_is_valid_constant_type(): |
|
import genmsg.msgs |
|
valid = ['int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', \ |
|
'uint64', 'float32', 'float64', 'char', 'byte', 'string'] |
|
invalid = [ |
|
'std_msgs/String', '/', 'String', |
|
'time', 'duration','header', |
|
] |
|
for v in valid: |
|
assert genmsg.msgs.is_valid_constant_type(v), "genmsg.msgs.is_valid_constant_type should have returned True for '%s'"%v |
|
for v in invalid: |
|
assert not genmsg.msgs.is_valid_constant_type(v), "genmsg.msgs.is_valid_constant_type should have returned False for '%s'"%v |
|
|
|
|