Browse Source

fix: uORB topics lost messages when publications overflow

sbg
FengShun 4 years ago committed by Beat Küng
parent
commit
d19b54481a
  1. 2
      msg/gps_inject_data.msg
  2. 2
      msg/mavlink_log.msg
  3. 2
      msg/orb_test_medium.msg
  4. 2
      msg/sensor_mag.msg
  5. 2
      msg/subsystem_info.msg
  6. 2
      msg/transponder_report.msg
  7. 2
      msg/tune_control.msg
  8. 2
      msg/uavcan_parameter_request.msg
  9. 2
      msg/ulog_stream.msg
  10. 2
      msg/vehicle_command.msg
  11. 2
      msg/vehicle_command_ack.msg
  12. 5
      src/modules/uORB/Subscription.cpp
  13. 2
      src/modules/uORB/Subscription.hpp
  14. 2
      src/modules/uORB/SubscriptionCallback.hpp
  15. 9
      src/modules/uORB/uORBDeviceMaster.cpp
  16. 70
      src/modules/uORB/uORBDeviceNode.cpp
  17. 20
      src/modules/uORB/uORBDeviceNode.hpp
  18. 164
      src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp
  19. 11
      src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp

2
msg/gps_inject_data.msg

@ -3,4 +3,4 @@ uint8 len # length of data @@ -3,4 +3,4 @@ uint8 len # length of data
uint8 flags # LSB: 1=fragmented
uint8[182] data # data to write to GPS device (RTCM message)
uint8 ORB_QUEUE_LENGTH = 6
uint8 ORB_QUEUE_LENGTH = 8

2
msg/mavlink_log.msg

@ -3,4 +3,4 @@ uint64 timestamp # time since system start (microseconds) @@ -3,4 +3,4 @@ uint64 timestamp # time since system start (microseconds)
char[127] text
uint8 severity # log level (same as in the linux kernel, starting with 0)
uint8 ORB_QUEUE_LENGTH = 5
uint8 ORB_QUEUE_LENGTH = 4

2
msg/orb_test_medium.msg

@ -4,4 +4,4 @@ int32 val @@ -4,4 +4,4 @@ int32 val
uint8[64] junk
# TOPICS orb_test_medium orb_test_medium_multi orb_test_medium_queue orb_test_medium_queue_poll
# TOPICS orb_test_medium orb_test_medium_multi orb_test_medium_wrap_around orb_test_medium_queue orb_test_medium_queue_poll

2
msg/sensor_mag.msg

@ -13,4 +13,4 @@ uint32 error_count @@ -13,4 +13,4 @@ uint32 error_count
bool is_external # if true the mag is external (i.e. not built into the board)
uint8 ORB_QUEUE_LENGTH = 3
uint8 ORB_QUEUE_LENGTH = 4

2
msg/subsystem_info.msg

@ -36,4 +36,4 @@ bool enabled @@ -36,4 +36,4 @@ bool enabled
bool ok
uint64 subsystem_type
uint8 ORB_QUEUE_LENGTH = 5
uint8 ORB_QUEUE_LENGTH = 8

2
msg/transponder_report.msg

@ -47,4 +47,4 @@ uint16 ADSB_EMITTER_TYPE_SERVICE_SURFACE=18 @@ -47,4 +47,4 @@ uint16 ADSB_EMITTER_TYPE_SERVICE_SURFACE=18
uint16 ADSB_EMITTER_TYPE_POINT_OBSTACLE=19
uint16 ADSB_EMITTER_TYPE_ENUM_END=20
uint8 ORB_QUEUE_LENGTH = 10
uint8 ORB_QUEUE_LENGTH = 8

2
msg/tune_control.msg

@ -35,4 +35,4 @@ uint8 VOLUME_LEVEL_MIN = 0 @@ -35,4 +35,4 @@ uint8 VOLUME_LEVEL_MIN = 0
uint8 VOLUME_LEVEL_DEFAULT = 40
uint8 VOLUME_LEVEL_MAX = 100
uint8 ORB_QUEUE_LENGTH = 3
uint8 ORB_QUEUE_LENGTH = 4

2
msg/uavcan_parameter_request.msg

@ -20,4 +20,4 @@ uint8 param_type # MAVLink parameter type @@ -20,4 +20,4 @@ uint8 param_type # MAVLink parameter type
int64 int_value # current value if param_type is int-like
float32 real_value # current value if param_type is float-like
uint8 ORB_QUEUE_LENGTH = 3
uint8 ORB_QUEUE_LENGTH = 4

2
msg/ulog_stream.msg

@ -16,4 +16,4 @@ uint16 msg_sequence # allows determine drops @@ -16,4 +16,4 @@ uint16 msg_sequence # allows determine drops
uint8 flags # see FLAGS_*
uint8[249] data # ulog data
uint8 ORB_QUEUE_LENGTH = 14 # TODO: we might be able to reduce this if mavlink polled on the topic
uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic

2
msg/vehicle_command.msg

@ -133,7 +133,7 @@ uint8 FAILURE_TYPE_SLOW = 5 @@ -133,7 +133,7 @@ uint8 FAILURE_TYPE_SLOW = 5
uint8 FAILURE_TYPE_DELAYED = 6
uint8 FAILURE_TYPE_INTERMITTENT = 7
uint8 ORB_QUEUE_LENGTH = 3
uint8 ORB_QUEUE_LENGTH = 4
float32 param1 # Parameter 1, as defined by MAVLink uint16 VEHICLE_CMD enum.
float32 param2 # Parameter 2, as defined by MAVLink uint16 VEHICLE_CMD enum.

2
msg/vehicle_command_ack.msg

@ -13,7 +13,7 @@ uint16 ARM_AUTH_DENIED_REASON_TIMEOUT = 3 @@ -13,7 +13,7 @@ uint16 ARM_AUTH_DENIED_REASON_TIMEOUT = 3
uint16 ARM_AUTH_DENIED_REASON_AIRSPACE_IN_USE = 4
uint16 ARM_AUTH_DENIED_REASON_BAD_WEATHER = 5
uint8 ORB_QUEUE_LENGTH = 3
uint8 ORB_QUEUE_LENGTH = 4
uint16 command
uint8 result

5
src/modules/uORB/Subscription.cpp

@ -65,10 +65,7 @@ bool Subscription::subscribe() @@ -65,10 +65,7 @@ bool Subscription::subscribe()
_node = node;
_node->add_internal_subscriber();
const unsigned curr_gen = _node->published_message_count();
// If there were any previous publications allow the subscriber to read them
_last_generation = curr_gen - math::min((unsigned)_node->get_queue_size(), curr_gen);
_last_generation = _node->get_initial_generation();
return true;
}

2
src/modules/uORB/Subscription.hpp

@ -137,7 +137,7 @@ public: @@ -137,7 +137,7 @@ public:
/**
* Check if there is a new update.
*/
bool updated() { return advertised() && (_node->published_message_count() != _last_generation); }
bool updated() { return advertised() && _node->updates_available(_last_generation); }
/**
* Update the struct

2
src/modules/uORB/SubscriptionCallback.hpp

@ -160,7 +160,7 @@ public: @@ -160,7 +160,7 @@ public:
{
// schedule immediately if updated (queue depth or subscription interval)
if ((_required_updates == 0)
|| (_subscription.get_node()->published_message_count() >= (_subscription.get_last_generation() + _required_updates))) {
|| (_subscription.get_node()->updates_available(_subscription.get_last_generation()) >= _required_updates)) {
if (updated()) {
_work_item->ScheduleNow();
}

9
src/modules/uORB/uORBDeviceMaster.cpp

@ -268,7 +268,8 @@ int uORB::DeviceMaster::addNewDeviceNodes(DeviceNodeStatisticsData **first_node, @@ -268,7 +268,8 @@ int uORB::DeviceMaster::addNewDeviceNodes(DeviceNodeStatisticsData **first_node,
max_topic_name_length = name_length;
}
last_node->last_pub_msg_count = last_node->node->published_message_count();
// Pass in 0 to get the index of the latest published data
last_node->last_pub_msg_count = last_node->node->updates_available(0);
}
return 0;
@ -374,9 +375,9 @@ void uORB::DeviceMaster::showTop(char **topic_filter, int num_filters) @@ -374,9 +375,9 @@ void uORB::DeviceMaster::showTop(char **topic_filter, int num_filters)
cur_node = first_node;
while (cur_node) {
unsigned int num_msgs = cur_node->node->published_message_count();
cur_node->pub_msg_delta = roundf((num_msgs - cur_node->last_pub_msg_count) / dt);
cur_node->last_pub_msg_count = num_msgs;
unsigned int num_msgs = cur_node->node->updates_available(cur_node->last_pub_msg_count);
cur_node->pub_msg_delta = roundf(num_msgs / dt);
cur_node->last_pub_msg_count += num_msgs;
cur_node = cur_node->next;
}

70
src/modules/uORB/uORBDeviceNode.cpp

@ -44,12 +44,48 @@ @@ -44,12 +44,48 @@
static uORB::SubscriptionInterval *filp_to_subscription(cdev::file_t *filp) { return static_cast<uORB::SubscriptionInterval *>(filp->f_priv); }
// Determine the data range
static inline bool is_in_range(unsigned left, unsigned value, unsigned right)
{
if (right > left) {
return (left <= value) && (value <= right);
} else { // Maybe the data overflowed and a wraparound occurred
return (left <= value) || (value <= right);
}
}
// round up to nearest power of two
// Such as 0 => 1, 1 => 1, 2 => 2 ,3 => 4, 10 => 16, 60 => 64, 65...255 => 128
// Note: When the input value > 128, the output is always 128
static inline uint8_t round_pow_of_two_8(uint8_t n)
{
if (n == 0) {
return 1;
}
// Avoid is already a power of 2
uint8_t value = n - 1;
// Fill 1
value |= value >> 1U;
value |= value >> 2U;
value |= value >> 4U;
// Unable to round-up, take the value of round-down
if (value == UINT8_MAX) {
value >>= 1U;
}
return value + 1;
}
uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t instance, const char *path,
uint8_t queue_size) :
CDev(path),
_meta(meta),
_instance(instance),
_queue_size(queue_size)
_queue_size(round_pow_of_two_8(queue_size))
{
}
@ -130,24 +166,23 @@ uORB::DeviceNode::copy(void *dst, unsigned &generation) @@ -130,24 +166,23 @@ uORB::DeviceNode::copy(void *dst, unsigned &generation)
ATOMIC_ENTER;
const unsigned current_generation = _generation.load();
if (current_generation > generation + _queue_size) {
// Reader is too far behind: some messages are lost
generation = current_generation - _queue_size;
}
if ((current_generation == generation) && (generation > 0)) {
if (current_generation == generation) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
--generation;
}
// Compatible with normal and overflow conditions
if (!is_in_range(current_generation - _queue_size, generation, current_generation - 1)) {
// Reader is too far behind: some messages are lost
generation = current_generation - _queue_size;
}
memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size);
ATOMIC_LEAVE;
if (generation < current_generation) {
++generation;
}
return true;
}
@ -223,6 +258,9 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) @@ -223,6 +258,9 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
item->call();
}
/* Mark at least one data has been published */
_data_valid = true;
ATOMIC_LEAVE;
/* notify any poll waiters */
@ -502,10 +540,22 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size) @@ -502,10 +540,22 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
return PX4_ERROR;
}
_queue_size = queue_size;
_queue_size = round_pow_of_two_8(queue_size);
return PX4_OK;
}
unsigned uORB::DeviceNode::get_initial_generation()
{
ATOMIC_ENTER;
// If there any previous publications allow the subscriber to read them
unsigned generation = _generation.load() - (_data_valid ? 1 : 0);
ATOMIC_LEAVE;
return generation;
}
bool
uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub)
{

20
src/modules/uORB/uORBDeviceNode.hpp

@ -50,6 +50,11 @@ class Manager; @@ -50,6 +50,11 @@ class Manager;
class SubscriptionCallback;
}
namespace uORBTest
{
class UnitTest;
}
/**
* Per-object device instance.
*/
@ -188,7 +193,18 @@ public: @@ -188,7 +193,18 @@ public:
int8_t subscriber_count() const { return _subscriber_count; }
unsigned published_message_count() const { return _generation.load(); }
/**
* Returns the number of updated data relative to the parameter 'generation'
* We can get the correct value regardless of wrap-around or not.
* @param generation The generation of subscriber
*/
unsigned updates_available(unsigned generation) const { return _generation.load() - generation; }
/**
* Return the initial generation to the subscriber
* @return The initial generation.
*/
unsigned get_initial_generation();
const orb_metadata *get_meta() const { return _meta; }
@ -224,10 +240,12 @@ protected: @@ -224,10 +240,12 @@ protected:
void poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t events) override;
private:
friend uORBTest::UnitTest;
const orb_metadata *_meta; /**< object metadata information */
uint8_t *_data{nullptr}; /**< allocated object buffer */
bool _data_valid{false}; /**< At least one valid data */
px4::atomic<unsigned> _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;

164
src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp

@ -202,6 +202,12 @@ int uORBTest::UnitTest::test() @@ -202,6 +202,12 @@ int uORBTest::UnitTest::test()
return ret;
}
ret = test_wrap_around();
if (ret != OK) {
return ret;
}
ret = test_queue();
if (ret != OK) {
@ -582,6 +588,162 @@ int uORBTest::UnitTest::test_multi_reversed() @@ -582,6 +588,162 @@ int uORBTest::UnitTest::test_multi_reversed()
return test_note("PASS multi-topic reversed");
}
int uORBTest::UnitTest::test_wrap_around()
{
test_note("Testing orb wrap-around");
orb_test_medium_s t{};
orb_test_medium_s u{};
orb_advert_t ptopic{nullptr};
bool updated{false};
// Advertise but not publish topics, only generate device_node, which is convenient for modifying DeviceNode::_generation
const int queue_size = 16;
ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_wrap_around), nullptr, queue_size);
if (ptopic == nullptr) {
return test_fail("advertise failed: %d", errno);
}
auto node = uORB::Manager::get_instance()->get_device_master()->getDeviceNode(ORB_ID(orb_test_medium_wrap_around), 0);
if (node == nullptr) {
return test_fail("get device node failed.");
}
// Set generation
{
// Set generation to the location where wrap-around is about to be: (0 - queue_size / 2) => (UINT_MAX - queue_size / 2 + 1)
set_generation(*node, unsigned(-(queue_size / 2)));
if (node->updates_available(0) != unsigned(-(queue_size / 2))) {
return test_fail("set generation failed.");
}
}
t.val = 0;
orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t);
int sfd = orb_subscribe(ORB_ID(orb_test_medium_wrap_around));
if (sfd < 0) {
return test_fail("subscribe failed: %d", errno);
}
orb_check(sfd, &updated);
if (!updated) {
return test_fail("update flag not set");
}
if (PX4_OK != orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u)) {
return test_fail("copy(1) failed: %d", errno);
}
if (u.val != t.val) {
return test_fail("copy(1) mismatch: %d expected %d", u.val, t.val);
}
orb_check(sfd, &updated);
if (updated) {
return test_fail("spurious updated flag");
}
#define CHECK_UPDATED(element) \
orb_check(sfd, &updated); \
if (!updated) { \
return test_fail("update flag not set, element %i", element); \
}
#define CHECK_NOT_UPDATED(element) \
orb_check(sfd, &updated); \
if (updated) { \
return test_fail("update flag set, element %i", element); \
}
#define CHECK_COPY(i_got, i_correct) \
orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u); \
if (i_got != i_correct) { \
return test_fail("got wrong element from the queue (got %i, should be %i)", i_got, i_correct); \
}
//no messages in the queue anymore
test_note(" Testing to write some elements...");
for (int i = 0; i < queue_size - 2; ++i) {
t.val = i;
orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t);
}
for (int i = 0; i < queue_size - 2; ++i) {
CHECK_UPDATED(i);
CHECK_COPY(u.val, i);
}
CHECK_NOT_UPDATED(queue_size);
#define SET_GENERTATION() \
set_generation(*node, unsigned(-(queue_size / 2))); \
if (node->updates_available(0) != unsigned(-(queue_size / 2))) { \
return test_fail("set generation failed."); \
} \
for (int i = 0; i < queue_size; i++) { \
if (PX4_OK != orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u)) { \
break; \
} \
}
test_note(" Testing overflow...");
int overflow_by = 3;
SET_GENERTATION();
for (int i = 0; i < queue_size + overflow_by; ++i) {
t.val = i;
orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t);
}
for (int i = 0; i < queue_size; ++i) {
CHECK_UPDATED(i);
CHECK_COPY(u.val, i + overflow_by);
}
CHECK_NOT_UPDATED(queue_size);
test_note(" Testing underflow...");
SET_GENERTATION();
t.val = queue_size;
orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t);
CHECK_UPDATED(-1);
CHECK_COPY(u.val, t.val);
for (int i = 0; i < queue_size; ++i) {
CHECK_NOT_UPDATED(i);
CHECK_COPY(u.val, t.val);
}
#undef SET_GENERTATION
t.val = 943;
orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t);
CHECK_UPDATED(-1);
test_note("before copy");
CHECK_COPY(u.val, t.val);
#undef CHECK_COPY
#undef CHECK_UPDATED
#undef CHECK_NOT_UPDATED
test_note("before unadvertise");
orb_unadvertise(ptopic);
orb_unsubscribe(sfd);
return test_note("PASS orb queuing");
}
int uORBTest::UnitTest::test_queue()
{
test_note("Testing orb queuing");
@ -597,7 +759,7 @@ int uORBTest::UnitTest::test_queue() @@ -597,7 +759,7 @@ int uORBTest::UnitTest::test_queue()
return test_fail("subscribe failed: %d", errno);
}
const int queue_size = 11;
const int queue_size = 16;
t.val = 0;
ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_queue), &t, queue_size);

11
src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp

@ -35,6 +35,9 @@ @@ -35,6 +35,9 @@
#define _uORBTest_UnitTest_hpp_
#include <uORB/uORB.h>
#include <uORB/uORBDeviceMaster.hpp>
#include <uORB/uORBDeviceNode.hpp>
#include <uORB/uORBManager.hpp>
#include <uORB/topics/orb_test.h>
#include <uORB/topics/orb_test_medium.h>
#include <uORB/topics/orb_test_large.h>
@ -68,6 +71,12 @@ public: @@ -68,6 +71,12 @@ public:
// Disallow copy
UnitTest(const uORBTest::UnitTest & /*unused*/) = delete;
// Assist in testing the wrap-around situation
static void set_generation(uORB::DeviceNode &node, unsigned generation)
{
node._generation.store(generation);
}
private:
UnitTest() : pubsubtest_passed(false), pubsubtest_print(false) {}
@ -94,6 +103,8 @@ private: @@ -94,6 +103,8 @@ private:
int test_multi2();
int test_wrap_around();
/* queuing tests */
int test_queue();
static int pub_test_queue_entry(int argc, char *argv[]);

Loading…
Cancel
Save