Browse Source

uORB::DeviceNode replace SubscriptionData with uORB::SubscriptionInterval

sbg
Daniel Agar 5 years ago
parent
commit
372866a22b
  1. 7
      src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp
  2. 10
      src/modules/uORB/Subscription.cpp
  3. 8
      src/modules/uORB/Subscription.hpp
  4. 186
      src/modules/uORB/uORBDeviceNode.cpp
  5. 36
      src/modules/uORB/uORBDeviceNode.hpp

7
src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp

@ -45,11 +45,6 @@ bool FlightTaskOffboard::updateInitialize() @@ -45,11 +45,6 @@ bool FlightTaskOffboard::updateInitialize()
{
bool ret = FlightTask::updateInitialize();
_sub_triplet_setpoint.update();
// require a valid triplet
ret = ret && _sub_triplet_setpoint.get().current.valid;
// require valid position / velocity in xy
return ret && PX4_ISFINITE(_position(0))
&& PX4_ISFINITE(_position(1))
@ -73,6 +68,8 @@ bool FlightTaskOffboard::update() @@ -73,6 +68,8 @@ bool FlightTaskOffboard::update()
// reset setpoint for every loop
_resetSetpoints();
_sub_triplet_setpoint.update();
if (!_sub_triplet_setpoint.get().current.valid) {
_setDefaultConstraints();
_position_setpoint = _position;

10
src/modules/uORB/Subscription.cpp

@ -65,16 +65,10 @@ bool Subscription::subscribe() @@ -65,16 +65,10 @@ bool Subscription::subscribe()
_node = node;
_node->add_internal_subscriber();
// If there were any previous publications, allow the subscriber to read them
const unsigned curr_gen = _node->published_message_count();
const uint8_t q_size = _node->get_queue_size();
if (q_size < curr_gen) {
_last_generation = curr_gen - q_size;
} else {
_last_generation = 0;
}
// If there were any previous publications allow the subscriber to read them
_last_generation = curr_gen - math::min((unsigned)_node->get_queue_size(), curr_gen);
return true;
}

8
src/modules/uORB/Subscription.hpp

@ -68,6 +68,7 @@ public: @@ -68,6 +68,7 @@ public:
_orb_id(id),
_instance(instance)
{
subscribe();
}
/**
@ -80,6 +81,7 @@ public: @@ -80,6 +81,7 @@ public:
_orb_id((meta == nullptr) ? ORB_ID::INVALID : static_cast<ORB_ID>(meta->o_id)),
_instance(instance)
{
subscribe();
}
~Subscription()
@ -111,19 +113,19 @@ public: @@ -111,19 +113,19 @@ public:
/**
* Check if there is a new update.
* */
bool updated() { return advertised() ? (_node->published_message_count() != _last_generation) : false; }
bool updated() { return advertised() && (_node->published_message_count() != _last_generation); }
/**
* Update the struct
* @param dst The uORB message struct we are updating.
*/
bool update(void *dst) { return updated() ? copy(dst) : false; }
bool update(void *dst) { return updated() && _node->copy(dst, _last_generation); }
/**
* Copy the struct
* @param dst The uORB message struct we are updating.
*/
bool copy(void *dst) { return advertised() ? _node->copy(dst, _last_generation) : false; }
bool copy(void *dst) { return advertised() && _node->copy(dst, _last_generation); }
uint8_t get_instance() const { return _instance; }
unsigned get_last_generation() const { return _last_generation; }

186
src/modules/uORB/uORBDeviceNode.cpp

@ -42,6 +42,8 @@ @@ -42,6 +42,8 @@
#include "uORBCommunicator.hpp"
#endif /* ORB_COMMUNICATOR */
static uORB::SubscriptionInterval *filp_to_subscription(cdev::file_t *filp) { return static_cast<uORB::SubscriptionInterval *>(filp->f_priv); }
uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t instance, const char *path,
ORB_PRIO priority, uint8_t queue_size) :
CDev(path),
@ -77,22 +79,16 @@ uORB::DeviceNode::open(cdev::file_t *filp) @@ -77,22 +79,16 @@ uORB::DeviceNode::open(cdev::file_t *filp)
if (filp->f_oflags == PX4_F_RDONLY) {
/* allocate subscriber data */
SubscriberData *sd = new SubscriberData{};
SubscriptionInterval *sd = new SubscriptionInterval(_meta, 0, _instance);
if (nullptr == sd) {
return -ENOMEM;
}
/* If there were any previous publications, allow the subscriber to read them */
const unsigned gen = published_message_count();
sd->generation = gen - (_queue_size < gen ? _queue_size : gen);
filp->f_priv = (void *)sd;
int ret = CDev::open(filp);
add_internal_subscriber();
if (ret != PX4_OK) {
PX4_ERR("CDev::open failed");
delete sd;
@ -113,93 +109,63 @@ int @@ -113,93 +109,63 @@ int
uORB::DeviceNode::close(cdev::file_t *filp)
{
if (filp->f_oflags == PX4_F_RDONLY) { /* subscriber */
SubscriberData *sd = static_cast<SubscriberData *>(filp->f_priv);
if (sd != nullptr) {
remove_internal_subscriber();
delete sd;
sd = nullptr;
}
SubscriptionInterval *sd = filp_to_subscription(filp);
delete sd;
}
return CDev::close(filp);
}
bool
uORB::DeviceNode::copy_locked(void *dst, unsigned &generation) const
uORB::DeviceNode::copy(void *dst, unsigned &generation)
{
bool updated = false;
if ((dst != nullptr) && (_data != nullptr)) {
const unsigned current_generation = _generation.load();
if (current_generation > generation + _queue_size) {
// Reader is too far behind: some messages are lost
generation = current_generation - _queue_size;
}
if ((current_generation == generation) && (generation > 0)) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
--generation;
}
memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size);
if (_queue_size == 1) {
ATOMIC_ENTER;
memcpy(dst, _data, _meta->o_size);
generation = _generation.load();
ATOMIC_LEAVE;
return true;
if (generation < current_generation) {
++generation;
}
} else {
ATOMIC_ENTER;
const unsigned current_generation = _generation.load();
updated = true;
}
if (current_generation > generation + _queue_size) {
// Reader is too far behind: some messages are lost
generation = current_generation - _queue_size;
}
return updated;
}
if ((current_generation == generation) && (generation > 0)) {
/* The subscriber already read the latest message, but nothing new was published yet.
* Return the previous message
*/
--generation;
}
bool
uORB::DeviceNode::copy(void *dst, unsigned &generation)
{
ATOMIC_ENTER;
memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size);
ATOMIC_LEAVE;
bool updated = copy_locked(dst, generation);
if (generation < current_generation) {
++generation;
}
ATOMIC_LEAVE;
return true;
}
}
return updated;
return false;
}
ssize_t
uORB::DeviceNode::read(cdev::file_t *filp, char *buffer, size_t buflen)
{
/* if the object has not been written yet, return zero */
if (_data == nullptr) {
return 0;
}
/* if the caller's buffer is the wrong size, that's an error */
if (buflen != _meta->o_size) {
return -EIO;
}
SubscriberData *sd = static_cast<SubscriberData *>(filp->f_priv);
/*
* Perform an atomic copy & state update
*/
ATOMIC_ENTER;
// if subscriber has an interval track the last update time
if (sd->update_interval) {
sd->update_interval->last_update = hrt_absolute_time();
}
copy_locked(buffer, sd->generation);
ATOMIC_LEAVE;
return _meta->o_size;
return filp_to_subscription(filp)->copy(buffer) ? _meta->o_size : 0;
}
ssize_t
@ -272,42 +238,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg) @@ -272,42 +238,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
switch (cmd) {
case ORBIOCUPDATED: {
ATOMIC_ENTER;
*(bool *)arg = appears_updated(filp);
*(bool *)arg = filp_to_subscription(filp)->updated();
ATOMIC_LEAVE;
return PX4_OK;
}
case ORBIOCSETINTERVAL: {
int ret = PX4_OK;
lock();
SubscriberData *sd = static_cast<SubscriberData *>(filp->f_priv);
if (arg == 0) {
if (sd->update_interval) {
delete (sd->update_interval);
sd->update_interval = nullptr;
}
} else {
if (sd->update_interval) {
sd->update_interval->interval = arg;
} else {
sd->update_interval = new UpdateIntervalData();
if (sd->update_interval) {
sd->update_interval->interval = arg;
} else {
ret = -ENOMEM;
}
}
}
unlock();
return ret;
}
case ORBIOCSETINTERVAL:
filp_to_subscription(filp)->set_interval_us(arg);
return PX4_OK;
case ORBIOCGADVERTISER:
*(uintptr_t *)arg = (uintptr_t)this;
@ -324,23 +262,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg) @@ -324,23 +262,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
return ret;
}
case ORBIOCGETINTERVAL: {
SubscriberData *sd = static_cast<SubscriberData *>(filp->f_priv);
if (sd->update_interval) {
*(unsigned *)arg = sd->update_interval->interval;
} else {
*(unsigned *)arg = 0;
}
}
return OK;
case ORBIOCGETINTERVAL:
*(unsigned *)arg = filp_to_subscription(filp)->get_interval_us();
return PX4_OK;
case ORBIOCISADVERTISED:
*(unsigned long *)arg = _advertised;
return OK;
return PX4_OK;
default:
/* give it to the superclass */
@ -450,43 +379,18 @@ px4_pollevent_t @@ -450,43 +379,18 @@ px4_pollevent_t
uORB::DeviceNode::poll_state(cdev::file_t *filp)
{
// If the topic appears updated to the subscriber, say so.
if (appears_updated(filp)) {
return POLLIN;
}
return 0;
return filp_to_subscription(filp)->updated() ? POLLIN : 0;
}
void
uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t events)
{
// If the topic looks updated to the subscriber, go ahead and notify them.
if (appears_updated((cdev::file_t *)fds->priv)) {
if (filp_to_subscription((cdev::file_t *)fds->priv)->updated()) {
CDev::poll_notify_one(fds, events);
}
}
bool
uORB::DeviceNode::appears_updated(cdev::file_t *filp)
{
// check if this topic has been published yet, if not bail out
if (_data == nullptr) {
return false;
}
SubscriberData *sd = static_cast<SubscriberData *>(filp->f_priv);
// if subscriber has interval check time since last update
if (sd->update_interval != nullptr) {
if (hrt_elapsed_time(&sd->update_interval->last_update) < sd->update_interval->interval) {
return false;
}
}
// finally, compare the generation
return (sd->generation != published_message_count());
}
bool
uORB::DeviceNode::print_statistics(int max_topic_length)
{

36
src/modules/uORB/uORBDeviceNode.hpp

@ -229,31 +229,6 @@ protected: @@ -229,31 +229,6 @@ protected:
private:
/**
* Copies data and the corresponding generation
* from a node to the buffer provided. Caller handles locking.
*
* @param dst
* The buffer into which the data is copied.
* @param generation
* The generation that was copied.
* @return bool
* Returns true if the data was copied.
*/
bool copy_locked(void *dst, unsigned &generation) const;
struct UpdateIntervalData {
uint64_t last_update{0}; /**< time at which the last update was provided, used when update_interval is nonzero */
unsigned interval{0}; /**< if nonzero minimum interval between updates */
};
struct SubscriberData {
~SubscriberData() { if (update_interval) { delete (update_interval); } }
unsigned generation{0}; /**< last generation the subscriber has seen */
UpdateIntervalData *update_interval{nullptr}; /**< if null, no update interval */
};
const orb_metadata *_meta; /**< object metadata information */
uint8_t *_data{nullptr}; /**< allocated object buffer */
@ -265,15 +240,4 @@ private: @@ -265,15 +240,4 @@ private:
bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */
uint8_t _queue_size; /**< maximum number of elements in the queue */
int8_t _subscriber_count{0};
/**
* Check whether a topic appears updated to a subscriber.
*
* Lock must already be held when calling this.
*
* @param sd The subscriber for whom to check.
* @return True if the topic should appear updated to the subscriber
*/
bool appears_updated(cdev::file_t *filp);
};

Loading…
Cancel
Save