Browse Source

orb: add optional queuing of messages

This adds two uORB API calls:
- orb_advertise_queue
- orb_advertise_multi_queue

Both add a queue_size parameter to define a maximum number of buffered
item. The existing orb calls use all a queue size of one and thus their
behavior is unchanged. If a writer publishes too fast, the oldest elements
from the queue are silently dropped.
The returned timestamp is always the one from the latest message in the
queue.

Queue size can be set via ioctl during advertisement phase. After that it
cannot be changed anymore.
sbg
Beat Küng 9 years ago committed by Lorenz Meier
parent
commit
5b1273e334
  1. 3
      src/drivers/drv_orb_dev.h
  2. 11
      src/modules/uORB/uORB.cpp
  3. 12
      src/modules/uORB/uORB.h
  4. 47
      src/modules/uORB/uORBDevices_nuttx.cpp
  5. 13
      src/modules/uORB/uORBDevices_nuttx.hpp
  6. 47
      src/modules/uORB/uORBDevices_posix.cpp
  7. 13
      src/modules/uORB/uORBDevices_posix.hpp
  8. 15
      src/modules/uORB/uORBManager.cpp
  9. 11
      src/modules/uORB/uORBManager.hpp

3
src/drivers/drv_orb_dev.h

@ -87,4 +87,7 @@ @@ -87,4 +87,7 @@
/** Get the priority for the topic */
#define ORBIOCGPRIORITY _ORBIOC(14)
/** Set the queue size of the topic */
#define ORBIOCSETQUEUESIZE _ORBIOC(15)
#endif /* _DRV_UORB_H */

11
src/modules/uORB/uORB.cpp

@ -45,12 +45,23 @@ orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data) @@ -45,12 +45,23 @@ orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
return uORB::Manager::get_instance()->orb_advertise(meta, data);
}
orb_advert_t orb_advertise_queue(const struct orb_metadata *meta, const void *data, int queue_size)
{
return uORB::Manager::get_instance()->orb_advertise(meta, data, queue_size);
}
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
int priority)
{
return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance, priority);
}
orb_advert_t orb_advertise_multi_queue(const struct orb_metadata *meta, const void *data, int *instance,
int priority, int queue_size)
{
return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance, priority, queue_size);
}
int orb_unadvertise(orb_advert_t handle)
{
return uORB::Manager::get_instance()->orb_unadvertise(handle);

12
src/modules/uORB/uORB.h

@ -140,12 +140,24 @@ typedef void *orb_advert_t; @@ -140,12 +140,24 @@ typedef void *orb_advert_t;
*/
extern orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data) __EXPORT;
/**
* @see uORB::Manager::orb_advertise()
*/
extern orb_advert_t orb_advertise_queue(const struct orb_metadata *meta, const void *data,
int queue_size) __EXPORT;
/**
* @see uORB::Manager::orb_advertise_multi()
*/
extern orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
int priority) __EXPORT;
/**
* @see uORB::Manager::orb_advertise_multi()
*/
extern orb_advert_t orb_advertise_multi_queue(const struct orb_metadata *meta, const void *data, int *instance,
int priority, int queue_size) __EXPORT;
/**
* @see uORB::Manager::orb_unadvertise()
*/

47
src/modules/uORB/uORBDevices_nuttx.cpp

@ -50,7 +50,8 @@ uORB::DeviceNode::DeviceNode @@ -50,7 +50,8 @@ uORB::DeviceNode::DeviceNode
const struct orb_metadata *meta,
const char *name,
const char *path,
int priority
int priority,
unsigned int queue_size
) :
CDev(name, path),
_meta(meta),
@ -60,6 +61,7 @@ uORB::DeviceNode::DeviceNode @@ -60,6 +61,7 @@ uORB::DeviceNode::DeviceNode
_publisher(0),
_priority(priority),
_published(false),
_queue_size(queue_size),
_IsRemoteSubscriberPresent(false),
_subscriber_count(0)
{
@ -185,13 +187,26 @@ uORB::DeviceNode::read(struct file *filp, char *buffer, size_t buflen) @@ -185,13 +187,26 @@ uORB::DeviceNode::read(struct file *filp, char *buffer, size_t buflen)
*/
irqstate_t flags = px4_enter_critical_section();
if (_generation > sd->generation + _queue_size) {
/* Reader is too far behind: some messages are lost */
sd->generation = _generation - _queue_size;
}
if (_generation == sd->generation && sd->generation > 0) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
--sd->generation;
}
/* if the caller doesn't want the data, don't give it to them */
if (nullptr != buffer) {
memcpy(buffer, _data, _meta->o_size);
memcpy(buffer, _data + (_meta->o_size * (sd->generation % _queue_size)), _meta->o_size);
}
/* track the last generation that the file has seen */
sd->generation = _generation;
if (sd->generation < _generation) {
++sd->generation;
}
/* set priority */
sd->priority = _priority;
@ -226,7 +241,7 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen) @@ -226,7 +241,7 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen)
/* re-check size */
if (nullptr == _data) {
_data = new uint8_t[_meta->o_size];
_data = new uint8_t[_meta->o_size * _queue_size];
}
unlock();
@ -245,10 +260,11 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen) @@ -245,10 +260,11 @@ uORB::DeviceNode::write(struct file *filp, const char *buffer, size_t buflen)
/* Perform an atomic copy. */
irqstate_t flags = px4_enter_critical_section();
memcpy(_data, buffer, _meta->o_size);
memcpy(_data + (_meta->o_size * (_generation % _queue_size)), buffer, _meta->o_size);
/* update the timestamp and generation count */
_last_update = hrt_absolute_time();
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
_generation++;
_published = true;
@ -290,6 +306,11 @@ uORB::DeviceNode::ioctl(struct file *filp, int cmd, unsigned long arg) @@ -290,6 +306,11 @@ uORB::DeviceNode::ioctl(struct file *filp, int cmd, unsigned long arg)
*(int *)arg = sd->priority;
return OK;
case ORBIOCSETQUEUESIZE:
//no need for locking here, since this is used only during the advertisement call,
//and only one advertiser is allowed to open the DeviceNode at the same time.
return update_queue_size(arg);
default:
/* give it to the superclass */
return CDev::ioctl(filp, cmd, arg);
@ -522,6 +543,20 @@ bool uORB::DeviceNode::is_published() @@ -522,6 +543,20 @@ bool uORB::DeviceNode::is_published()
return _published;
}
int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
{
if (_queue_size == queue_size) {
return PX4_OK;
}
if (_data || _queue_size > queue_size) {
return ERROR;
}
_queue_size = queue_size;
return PX4_OK;
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
int16_t uORB::DeviceNode::process_add_subscription(int32_t rateInHz)

13
src/modules/uORB/uORBDevices_nuttx.hpp

@ -61,7 +61,8 @@ public: @@ -61,7 +61,8 @@ public:
const struct orb_metadata *meta,
const char *name,
const char *path,
int priority
int priority,
unsigned int queue_size = 1
);
/**
@ -168,6 +169,15 @@ public: @@ -168,6 +169,15 @@ public:
* and publish to this node or if another node should be tried. */
bool is_published();
/**
* Try to change the size of the queue. This can only be done as long as nobody published yet.
* This is the case, for example when orb_subscribe was called before an orb_advertise.
* The queue size can only be increased.
* @param queue_size new size of the queue
* @return PX4_OK if queue size successfully set
*/
int update_queue_size(unsigned int queue_size);
protected:
virtual pollevent_t poll_state(struct file *filp);
virtual void poll_notify_one(struct pollfd *fds, pollevent_t events);
@ -190,6 +200,7 @@ private: @@ -190,6 +200,7 @@ private:
We allow one publisher to have an open file descriptor at the same time. */
const int _priority; /**< priority of topic */
bool _published; /**< has ever data been published */
unsigned int _queue_size; /**< maximum number of elements in the queue */
private: // private class methods.

47
src/modules/uORB/uORBDevices_posix.cpp

@ -61,7 +61,8 @@ uORB::DeviceNode::SubscriberData *uORB::DeviceNode::filp_to_sd(device::file_t * @@ -61,7 +61,8 @@ uORB::DeviceNode::SubscriberData *uORB::DeviceNode::filp_to_sd(device::file_t *
return sd;
}
uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, int priority) :
uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, const char *path,
int priority, unsigned int queue_size) :
VDev(name, path),
_meta(meta),
_data(nullptr),
@ -70,6 +71,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name, @@ -70,6 +71,7 @@ uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const char *name,
_publisher(0),
_priority(priority),
_published(false),
_queue_size(queue_size),
_subscriber_count(0)
{
// enable debug() calls
@ -198,13 +200,26 @@ uORB::DeviceNode::read(device::file_t *filp, char *buffer, size_t buflen) @@ -198,13 +200,26 @@ uORB::DeviceNode::read(device::file_t *filp, char *buffer, size_t buflen)
*/
lock();
if (_generation > sd->generation + _queue_size) {
/* Reader is too far behind: some messages are lost */
sd->generation = _generation - _queue_size;
}
if (_generation == sd->generation && sd->generation > 0) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
--sd->generation;
}
/* if the caller doesn't want the data, don't give it to them */
if (nullptr != buffer) {
memcpy(buffer, _data, _meta->o_size);
memcpy(buffer, _data + (_meta->o_size * (sd->generation % _queue_size)), _meta->o_size);
}
/* track the last generation that the file has seen */
sd->generation = _generation;
if (sd->generation < _generation) {
++sd->generation;
}
/* set priority */
sd->priority = _priority;
@ -238,7 +253,7 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen) @@ -238,7 +253,7 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen)
/* re-check size */
if (nullptr == _data) {
_data = new uint8_t[_meta->o_size];
_data = new uint8_t[_meta->o_size * _queue_size];
}
unlock();
@ -255,10 +270,11 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen) @@ -255,10 +270,11 @@ uORB::DeviceNode::write(device::file_t *filp, const char *buffer, size_t buflen)
}
lock();
memcpy(_data, buffer, _meta->o_size);
memcpy(_data + (_meta->o_size * (_generation % _queue_size)), buffer, _meta->o_size);
/* update the timestamp and generation count */
_last_update = hrt_absolute_time();
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
_generation++;
_published = true;
@ -305,6 +321,11 @@ uORB::DeviceNode::ioctl(device::file_t *filp, int cmd, unsigned long arg) @@ -305,6 +321,11 @@ uORB::DeviceNode::ioctl(device::file_t *filp, int cmd, unsigned long arg)
*(int *)arg = sd->priority;
return PX4_OK;
case ORBIOCSETQUEUESIZE:
//no need for locking here, since this is used only during the advertisement call,
//and only one advertiser is allowed to open the DeviceNode at the same time.
return update_queue_size(arg);
default:
/* give it to the superclass */
return VDev::ioctl(filp, cmd, arg);
@ -527,6 +548,20 @@ bool uORB::DeviceNode::is_published() @@ -527,6 +548,20 @@ bool uORB::DeviceNode::is_published()
return _published;
}
int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
{
if (_queue_size == queue_size) {
return PX4_OK;
}
if (_data || _queue_size > queue_size) {
return ERROR;
}
_queue_size = queue_size;
return PX4_OK;
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
int16_t uORB::DeviceNode::process_add_subscription(int32_t rateInHz)

13
src/modules/uORB/uORBDevices_posix.hpp

@ -48,7 +48,8 @@ class DeviceMaster; @@ -48,7 +48,8 @@ class DeviceMaster;
class uORB::DeviceNode : public device::VDev
{
public:
DeviceNode(const struct orb_metadata *meta, const char *name, const char *path, int priority);
DeviceNode(const struct orb_metadata *meta, const char *name, const char *path,
int priority, unsigned int queue_size = 1);
~DeviceNode();
virtual int open(device::file_t *filp);
@ -105,6 +106,15 @@ public: @@ -105,6 +106,15 @@ public:
* and publish to this node or if another node should be tried. */
bool is_published();
/**
* Try to change the size of the queue. This can only be done as long as nobody published yet.
* This is the case, for example when orb_subscribe was called before an orb_advertise.
* The queue size can only be increased.
* @param queue_size new size of the queue
* @return PX4_OK if queue size successfully set
*/
int update_queue_size(unsigned int queue_size);
protected:
virtual pollevent_t poll_state(device::file_t *filp);
virtual void poll_notify_one(px4_pollfd_struct_t *fds, pollevent_t events);
@ -128,6 +138,7 @@ private: @@ -128,6 +138,7 @@ private:
We allow one publisher to have an open file descriptor at the same time. */
const int _priority; /**< priority of topic */
bool _published; /**< has ever data been published */
unsigned int _queue_size; /**< maximum number of elements in the queue */
static SubscriberData *filp_to_sd(device::file_t *filp);

15
src/modules/uORB/uORBManager.cpp

@ -87,14 +87,14 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance) @@ -87,14 +87,14 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance)
#endif
}
orb_advert_t uORB::Manager::orb_advertise(const struct orb_metadata *meta, const void *data)
orb_advert_t uORB::Manager::orb_advertise(const struct orb_metadata *meta, const void *data, int queue_size)
{
//warnx("orb_advertise meta = %p", meta);
return orb_advertise_multi(meta, data, nullptr, ORB_PRIO_DEFAULT);
return orb_advertise_multi(meta, data, nullptr, ORB_PRIO_DEFAULT, queue_size);
}
orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
int priority)
int priority, int queue_size)
{
int result, fd;
orb_advert_t advertiser;
@ -109,6 +109,15 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, @@ -109,6 +109,15 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
return nullptr;
}
/* Set the queue size. This must be done before the first publication; thus it fails if
* this is not the first advertiser.
*/
result = px4_ioctl(fd, ORBIOCSETQUEUESIZE, (unsigned long)queue_size);
if (result < 0 && queue_size > 1) {
PX4_WARN("orb_advertise_multi: failed to set queue size");
}
/* get the advertiser handle and close the node */
result = px4_ioctl(fd, ORBIOCGADVERTISER, (unsigned long)&advertiser);
px4_close(fd);

11
src/modules/uORB/uORBManager.hpp

@ -96,13 +96,15 @@ public: @@ -96,13 +96,15 @@ public:
* @param data A pointer to the initial data to be published.
* For topics updated by interrupt handlers, the advertisement
* must be performed from non-interrupt context.
* @param queue_size Maximum number of buffered elements. If this is 1, no queuing is
* used.
* @return nullptr on error, otherwise returns an object pointer
* that can be used to publish to the topic.
* If the topic in question is not known (due to an
* ORB_DEFINE with no corresponding ORB_DECLARE)
* this function will return nullptr and set errno to ENOENT.
*/
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data);
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data, int queue_size = 1);
/**
* Advertise as the publisher of a topic.
@ -130,6 +132,8 @@ public: @@ -130,6 +132,8 @@ public:
* instances, the priority allows the subscriber to prioritize the best
* data source as long as its available. The subscriber is responsible to check
* and handle different priorities (@see orb_priority()).
* @param queue_size Maximum number of buffered elements. If this is 1, no queuing is
* used.
* @return ERROR on error, otherwise returns a handle
* that can be used to publish to the topic.
* If the topic in question is not known (due to an
@ -137,7 +141,7 @@ public: @@ -137,7 +141,7 @@ public:
* this function will return -1 and set errno to ENOENT.
*/
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
int priority) ;
int priority, int queue_size = 1) ;
/**
@ -272,7 +276,8 @@ public: @@ -272,7 +276,8 @@ public:
int orb_check(int handle, bool *updated) ;
/**
* Return the last time that the topic was updated.
* Return the last time that the topic was updated. If a queue is used, it returns
* the timestamp of the latest element in the queue.
*
* @param handle A handle returned from orb_subscribe.
* @param time Returns the absolute time that the topic was updated, or zero if it has

Loading…
Cancel
Save