diff --git a/src/drivers/drv_orb_dev.h b/src/drivers/drv_orb_dev.h index 79a2895fb5..710eb3d175 100644 --- a/src/drivers/drv_orb_dev.h +++ b/src/drivers/drv_orb_dev.h @@ -87,4 +87,7 @@ /** Get the priority for the topic */ #define ORBIOCGPRIORITY _ORBIOC(14) +/** Set the queue size of the topic */ +#define ORBIOCSETQUEUESIZE _ORBIOC(15) + #endif /* _DRV_UORB_H */ diff --git a/src/modules/uORB/uORB.cpp b/src/modules/uORB/uORB.cpp index 375f261ca8..cf32658673 100644 --- a/src/modules/uORB/uORB.cpp +++ b/src/modules/uORB/uORB.cpp @@ -45,12 +45,23 @@ orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data) return uORB::Manager::get_instance()->orb_advertise(meta, data); } +orb_advert_t orb_advertise_queue(const struct orb_metadata *meta, const void *data, int queue_size) +{ + return uORB::Manager::get_instance()->orb_advertise(meta, data, queue_size); +} + orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, int priority) { return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance, priority); } +orb_advert_t orb_advertise_multi_queue(const struct orb_metadata *meta, const void *data, int *instance, + int priority, int queue_size) +{ + return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance, priority, queue_size); +} + int orb_unadvertise(orb_advert_t handle) { return uORB::Manager::get_instance()->orb_unadvertise(handle); diff --git a/src/modules/uORB/uORB.h b/src/modules/uORB/uORB.h index 57e12f9236..e189476f32 100644 --- a/src/modules/uORB/uORB.h +++ b/src/modules/uORB/uORB.h @@ -140,12 +140,24 @@ typedef void *orb_advert_t; */ extern orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data) __EXPORT; +/** + * @see uORB::Manager::orb_advertise() + */ +extern orb_advert_t orb_advertise_queue(const struct orb_metadata *meta, const void *data, + int queue_size) __EXPORT; + /** * @see uORB::Manager::orb_advertise_multi() */ extern orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, int priority) __EXPORT; +/** + * @see uORB::Manager::orb_advertise_multi() + */ +extern orb_advert_t orb_advertise_multi_queue(const struct orb_metadata *meta, const void *data, int *instance, + int priority, int queue_size) __EXPORT; + /** * @see uORB::Manager::orb_unadvertise() */ diff --git a/src/modules/uORB/uORBDevices_nuttx.cpp b/src/modules/uORB/uORBDevices_nuttx.cpp index 7eb6e52fa4..e27f55d297 100644 --- a/src/modules/uORB/uORBDevices_nuttx.cpp +++ b/src/modules/uORB/uORBDevices_nuttx.cpp @@ -50,7 +50,8 @@ uORB::DeviceNode::DeviceNode const struct orb_metadata *meta, const char *name, const char *path, - int priority + int priority, + unsigned int queue_size ) : CDev(name, path), _meta(meta), @@ -60,6 +61,7 @@ uORB::DeviceNode::DeviceNode _publisher(0), _priority(priority), _published(false), + _queue_size(queue_size), _IsRemoteSubscriberPresent(false), _subscriber_count(0) { @@ -185,13 +187,26 @@ uORB::DeviceNode::read(struct file *filp, char *buffer, size_t buflen) */ irqstate_t flags = px4_enter_critical_section(); + if (_generation > sd->generation + _queue_size) { + /* Reader is too far behind: some messages are lost */ + sd->generation = _generation - _queue_size; + } + + if (_generation == sd->generation && sd->generation > 0) { + /* The subscriber already read the latest message, but nothing new was published yet. + * Return the previous message + */ + --sd->generation; + } + /* if the caller doesn't want the data, don't give it to them */ if (nullptr != buffer) { - memcpy(buffer, _data, _meta->o_size); + memcpy(buffer, _data + (_meta->o_size * (sd->generation % _queue_size)), _meta->o_size); } - /* track the last generation that the file has seen */ - sd->generation = _generation; + if (sd->generation < _generation) { + ++sd->generation; + } /* set priority */ sd->priority = _priority; @@ -226,7 +241,7 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen) /* re-check size */ if (nullptr == _data) { - _data = new uint8_t[_meta->o_size]; + _data = new uint8_t[_meta->o_size * _queue_size]; } unlock(); @@ -245,10 +260,11 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen) /* Perform an atomic copy. */ irqstate_t flags = px4_enter_critical_section(); - memcpy(_data, buffer, _meta->o_size); + memcpy(_data + (_meta->o_size * (_generation % _queue_size)), buffer, _meta->o_size); /* update the timestamp and generation count */ _last_update = hrt_absolute_time(); + /* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */ _generation++; _published = true; @@ -290,6 +306,11 @@ uORB::DeviceNode::ioctl(struct file *filp, int cmd, unsigned long arg) *(int *)arg = sd->priority; return OK; + case ORBIOCSETQUEUESIZE: + //no need for locking here, since this is used only during the advertisement call, + //and only one advertiser is allowed to open the DeviceNode at the same time. + return update_queue_size(arg); + default: /* give it to the superclass */ return CDev::ioctl(filp, cmd, arg); @@ -522,6 +543,20 @@ bool uORB::DeviceNode::is_published() return _published; } +int uORB::DeviceNode::update_queue_size(unsigned int queue_size) +{ + if (_queue_size == queue_size) { + return PX4_OK; + } + + if (_data || _queue_size > queue_size) { + return ERROR; + } + + _queue_size = queue_size; + return PX4_OK; +} + //----------------------------------------------------------------------------- //----------------------------------------------------------------------------- int16_t uORB::DeviceNode::process_add_subscription(int32_t rateInHz) diff --git a/src/modules/uORB/uORBDevices_nuttx.hpp b/src/modules/uORB/uORBDevices_nuttx.hpp index 1519b06a82..2015bff986 100644 --- a/src/modules/uORB/uORBDevices_nuttx.hpp +++ b/src/modules/uORB/uORBDevices_nuttx.hpp @@ -61,7 +61,8 @@ public: const struct orb_metadata *meta, const char *name, const char *path, - int priority + int priority, + unsigned int queue_size = 1 ); /** @@ -168,6 +169,15 @@ public: * and publish to this node or if another node should be tried. */ bool is_published(); + /** + * Try to change the size of the queue. This can only be done as long as nobody published yet. + * This is the case, for example when orb_subscribe was called before an orb_advertise. + * The queue size can only be increased. + * @param queue_size new size of the queue + * @return PX4_OK if queue size successfully set + */ + int update_queue_size(unsigned int queue_size); + protected: virtual pollevent_t poll_state(struct file *filp); virtual void poll_notify_one(struct pollfd *fds, pollevent_t events); @@ -190,6 +200,7 @@ private: We allow one publisher to have an open file descriptor at the same time. */ const int _priority; /**< priority of topic */ bool _published; /**< has ever data been published */ + unsigned int _queue_size; /**< maximum number of elements in the queue */ private: // private class methods. diff --git a/src/modules/uORB/uORBDevices_posix.cpp b/src/modules/uORB/uORBDevices_posix.cpp index d641859875..c5cd4b8fbd 100644 --- a/src/modules/uORB/uORBDevices_posix.cpp +++ b/src/modules/uORB/uORBDevices_posix.cpp @@ -61,7 +61,8 @@ uORB::DeviceNode::SubscriberData *uORB::DeviceNode::filp_to_sd(device::file_t * return sd; } -uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, int priority) : +uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, + int priority, unsigned int queue_size) : VDev(name, path), _meta(meta), _data(nullptr), @@ -70,6 +71,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, _publisher(0), _priority(priority), _published(false), + _queue_size(queue_size), _subscriber_count(0) { // enable debug() calls @@ -198,13 +200,26 @@ uORB::DeviceNode::read(device::file_t *filp, char *buffer, size_t buflen) */ lock(); + if (_generation > sd->generation + _queue_size) { + /* Reader is too far behind: some messages are lost */ + sd->generation = _generation - _queue_size; + } + + if (_generation == sd->generation && sd->generation > 0) { + /* The subscriber already read the latest message, but nothing new was published yet. + * Return the previous message + */ + --sd->generation; + } + /* if the caller doesn't want the data, don't give it to them */ if (nullptr != buffer) { - memcpy(buffer, _data, _meta->o_size); + memcpy(buffer, _data + (_meta->o_size * (sd->generation % _queue_size)), _meta->o_size); } - /* track the last generation that the file has seen */ - sd->generation = _generation; + if (sd->generation < _generation) { + ++sd->generation; + } /* set priority */ sd->priority = _priority; @@ -238,7 +253,7 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen) /* re-check size */ if (nullptr == _data) { - _data = new uint8_t[_meta->o_size]; + _data = new uint8_t[_meta->o_size * _queue_size]; } unlock(); @@ -255,10 +270,11 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen) } lock(); - memcpy(_data, buffer, _meta->o_size); + memcpy(_data + (_meta->o_size * (_generation % _queue_size)), buffer, _meta->o_size); /* update the timestamp and generation count */ _last_update = hrt_absolute_time(); + /* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */ _generation++; _published = true; @@ -305,6 +321,11 @@ uORB::DeviceNode::ioctl(device::file_t *filp, int cmd, unsigned long arg) *(int *)arg = sd->priority; return PX4_OK; + case ORBIOCSETQUEUESIZE: + //no need for locking here, since this is used only during the advertisement call, + //and only one advertiser is allowed to open the DeviceNode at the same time. + return update_queue_size(arg); + default: /* give it to the superclass */ return VDev::ioctl(filp, cmd, arg); @@ -527,6 +548,20 @@ bool uORB::DeviceNode::is_published() return _published; } +int uORB::DeviceNode::update_queue_size(unsigned int queue_size) +{ + if (_queue_size == queue_size) { + return PX4_OK; + } + + if (_data || _queue_size > queue_size) { + return ERROR; + } + + _queue_size = queue_size; + return PX4_OK; +} + //----------------------------------------------------------------------------- //----------------------------------------------------------------------------- int16_t uORB::DeviceNode::process_add_subscription(int32_t rateInHz) diff --git a/src/modules/uORB/uORBDevices_posix.hpp b/src/modules/uORB/uORBDevices_posix.hpp index 963ed4a79d..35882accaa 100644 --- a/src/modules/uORB/uORBDevices_posix.hpp +++ b/src/modules/uORB/uORBDevices_posix.hpp @@ -48,7 +48,8 @@ class DeviceMaster; class uORB::DeviceNode : public device::VDev { public: - DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, int priority); + DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, + int priority, unsigned int queue_size = 1); ~DeviceNode(); virtual int open(device::file_t *filp); @@ -105,6 +106,15 @@ public: * and publish to this node or if another node should be tried. */ bool is_published(); + /** + * Try to change the size of the queue. This can only be done as long as nobody published yet. + * This is the case, for example when orb_subscribe was called before an orb_advertise. + * The queue size can only be increased. + * @param queue_size new size of the queue + * @return PX4_OK if queue size successfully set + */ + int update_queue_size(unsigned int queue_size); + protected: virtual pollevent_t poll_state(device::file_t *filp); virtual void poll_notify_one(px4_pollfd_struct_t *fds, pollevent_t events); @@ -128,6 +138,7 @@ private: We allow one publisher to have an open file descriptor at the same time. */ const int _priority; /**< priority of topic */ bool _published; /**< has ever data been published */ + unsigned int _queue_size; /**< maximum number of elements in the queue */ static SubscriberData *filp_to_sd(device::file_t *filp); diff --git a/src/modules/uORB/uORBManager.cpp b/src/modules/uORB/uORBManager.cpp index c1266ba04a..6c2309b13f 100644 --- a/src/modules/uORB/uORBManager.cpp +++ b/src/modules/uORB/uORBManager.cpp @@ -87,14 +87,14 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance) #endif } -orb_advert_t uORB::Manager::orb_advertise(const struct orb_metadata *meta, const void *data) +orb_advert_t uORB::Manager::orb_advertise(const struct orb_metadata *meta, const void *data, int queue_size) { //warnx("orb_advertise meta = %p", meta); - return orb_advertise_multi(meta, data, nullptr, ORB_PRIO_DEFAULT); + return orb_advertise_multi(meta, data, nullptr, ORB_PRIO_DEFAULT, queue_size); } orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, - int priority) + int priority, int queue_size) { int result, fd; orb_advert_t advertiser; @@ -109,6 +109,15 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, return nullptr; } + /* Set the queue size. This must be done before the first publication; thus it fails if + * this is not the first advertiser. + */ + result = px4_ioctl(fd, ORBIOCSETQUEUESIZE, (unsigned long)queue_size); + + if (result < 0 && queue_size > 1) { + PX4_WARN("orb_advertise_multi: failed to set queue size"); + } + /* get the advertiser handle and close the node */ result = px4_ioctl(fd, ORBIOCGADVERTISER, (unsigned long)&advertiser); px4_close(fd); diff --git a/src/modules/uORB/uORBManager.hpp b/src/modules/uORB/uORBManager.hpp index ca5aee846e..21f5777265 100644 --- a/src/modules/uORB/uORBManager.hpp +++ b/src/modules/uORB/uORBManager.hpp @@ -96,13 +96,15 @@ public: * @param data A pointer to the initial data to be published. * For topics updated by interrupt handlers, the advertisement * must be performed from non-interrupt context. + * @param queue_size Maximum number of buffered elements. If this is 1, no queuing is + * used. * @return nullptr on error, otherwise returns an object pointer * that can be used to publish to the topic. * If the topic in question is not known (due to an * ORB_DEFINE with no corresponding ORB_DECLARE) * this function will return nullptr and set errno to ENOENT. */ - orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data); + orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data, int queue_size = 1); /** * Advertise as the publisher of a topic. @@ -130,6 +132,8 @@ public: * instances, the priority allows the subscriber to prioritize the best * data source as long as its available. The subscriber is responsible to check * and handle different priorities (@see orb_priority()). + * @param queue_size Maximum number of buffered elements. If this is 1, no queuing is + * used. * @return ERROR on error, otherwise returns a handle * that can be used to publish to the topic. * If the topic in question is not known (due to an @@ -137,7 +141,7 @@ public: * this function will return -1 and set errno to ENOENT. */ orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, - int priority) ; + int priority, int queue_size = 1) ; /** @@ -272,7 +276,8 @@ public: int orb_check(int handle, bool *updated) ; /** - * Return the last time that the topic was updated. + * Return the last time that the topic was updated. If a queue is used, it returns + * the timestamp of the latest element in the queue. * * @param handle A handle returned from orb_subscribe. * @param time Returns the absolute time that the topic was updated, or zero if it has