diff --git a/src/modules/uORB/SubscriptionBlocking.hpp b/src/modules/uORB/SubscriptionBlocking.hpp new file mode 100644 index 0000000000..3d9de3f023 --- /dev/null +++ b/src/modules/uORB/SubscriptionBlocking.hpp @@ -0,0 +1,152 @@ +/**************************************************************************** + * + * Copyright (c) 2019 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +#pragma once + +#include "SubscriptionCallback.hpp" + +#include +#include + +namespace uORB +{ + +// Subscription with callback that schedules a WorkItem +template +class SubscriptionBlocking : public SubscriptionCallback +{ +public: + /** + * Constructor + * + * @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic. + * @param interval_us The requested maximum update interval in microseconds. + * @param instance The instance for multi sub. + */ + SubscriptionBlocking(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) : + SubscriptionCallback(meta, interval_us, instance) + { + // pthread_mutexattr_init + pthread_mutexattr_t attr; + int ret_attr_init = pthread_mutexattr_init(&attr); + + if (ret_attr_init != 0) { + PX4_ERR("pthread_mutexattr_init failed, status=%d", ret_attr_init); + } + + // pthread_mutexattr_settype + int ret_mutexattr_settype = pthread_mutexattr_settype(&attr, PTHREAD_PRIO_NONE); + + if (ret_mutexattr_settype != 0) { + PX4_ERR("pthread_mutexattr_settype failed, status=%d", ret_mutexattr_settype); + } + + // pthread_mutex_init + int ret_mutex_init = pthread_mutex_init(&_mutex, &attr); + + if (ret_mutex_init != 0) { + PX4_ERR("pthread_mutex_init failed, status=%d", ret_mutex_init); + } + } + + virtual ~SubscriptionBlocking() + { + pthread_mutex_destroy(&_mutex); + pthread_cond_destroy(&_cv); + } + + void call() override + { + // signal immediately if no interval, otherwise only if interval has elapsed + if ((_interval_us == 0) || (hrt_elapsed_time(&_last_update) >= _interval_us)) { + pthread_cond_signal(&_cv); + } + } + + /** + * Copy the struct if updated. + * @param data The data reference where the struct will be copied. + * @param timeout_us The requested timeout in microseconds, or 0 to wait indefinitely. + * + * @return true only if topic was updated and copied successfully. + */ + bool updateBlocking(T &data, uint32_t timeout_us = 0) + { + if (!_registered) { + registerCallback(); + } + + if (updated()) { + // copy immediately if updated + return copy(&data); + + } else { + // otherwise wait + + LockGuard lg{_mutex}; + + if (timeout_us == 0) { + // wait with no timeout + if (pthread_cond_wait(&_cv, &_mutex) == 0) { + return update(&data); + } + + } else { + // otherwise wait with timeout based on interval + + // Calculate an absolute time in the future + struct timespec ts; + px4_clock_gettime(CLOCK_REALTIME, &ts); + uint64_t nsecs = ts.tv_nsec + (timeout_us * 1000); + static constexpr unsigned billion = (1000 * 1000 * 1000); + ts.tv_sec += nsecs / billion; + nsecs -= (nsecs / billion) * billion; + ts.tv_nsec = nsecs; + + if (px4_pthread_cond_timedwait(&_cv, &_mutex, &ts) == 0) { + return update(&data); + } + } + } + + return false; + } + +private: + + pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t _cv = PTHREAD_COND_INITIALIZER; + +}; + +} // namespace uORB diff --git a/src/modules/uORB/SubscriptionCallback.hpp b/src/modules/uORB/SubscriptionCallback.hpp index 4578f10448..f58f6e84d7 100644 --- a/src/modules/uORB/SubscriptionCallback.hpp +++ b/src/modules/uORB/SubscriptionCallback.hpp @@ -53,11 +53,11 @@ public: * Constructor * * @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic. - * @param interval_ms The requested maximum update interval in milliseconds. + * @param interval_us The requested maximum update interval in microseconds. * @param instance The instance for multi sub. */ - SubscriptionCallback(const orb_metadata *meta, uint8_t interval_ms = 0, uint8_t instance = 0) : - SubscriptionInterval(meta, interval_ms, instance) + SubscriptionCallback(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) : + SubscriptionInterval(meta, interval_us, instance) { } @@ -68,11 +68,9 @@ public: bool registerCallback() { - bool ret = false; - if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) { // registered - ret = true; + _registered = true; } else { // force topic creation by subscribing with old API @@ -81,15 +79,14 @@ public: // try to register callback again if (_subscription.subscribe()) { if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) { - ret = true; + _registered = true; } } orb_unsubscribe(fd); } - - return ret; + return _registered; } void unregisterCallback() @@ -97,10 +94,16 @@ public: if (_subscription.get_node()) { _subscription.get_node()->unregister_callback(this); } + + _registered = false; } virtual void call() = 0; +protected: + + bool _registered{false}; + }; // Subscription with callback that schedules a WorkItem