From d31b7feb313acfc055e917301c24ab9d642d47f8 Mon Sep 17 00:00:00 2001 From: TSC21 Date: Mon, 12 Jul 2021 10:10:18 +0200 Subject: [PATCH] microRTPS: agent: publish timesync status --- .../uorb_microcdr/microRTPS_client.cpp.em | 4 +- msg/templates/urtps/RtpsTopics.cpp.em | 2 + msg/templates/urtps/Subscriber.cpp.em | 2 +- msg/templates/urtps/microRTPS_timesync.cpp.em | 55 ++++++++++++++---- msg/templates/urtps/microRTPS_timesync.h.em | 58 +++++++++++++++++-- msg/timesync_status.msg | 2 +- msg/tools/uorb_rtps_message_ids.yaml | 1 + 7 files changed, 104 insertions(+), 20 deletions(-) diff --git a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em index da443bbc10..d611e720b8 100644 --- a/msg/templates/uorb_microcdr/microRTPS_client.cpp.em +++ b/msg/templates/uorb_microcdr/microRTPS_client.cpp.em @@ -69,7 +69,7 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] = #include #include -#include +#include #include @[for topic in list(set(topic_names))]@ #include @@ -82,7 +82,7 @@ using namespace time_literals; // Publishers for received messages struct RcvTopicsPubs { @[ for idx, topic in enumerate(recv_topics)]@ - uORB::Publication <@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))}; + uORB::PublicationMulti <@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))}; @[ end for]@ }; @[end if]@ diff --git a/msg/templates/urtps/RtpsTopics.cpp.em b/msg/templates/urtps/RtpsTopics.cpp.em index 4de51ae9a7..f1a38ac5d2 100644 --- a/msg/templates/urtps/RtpsTopics.cpp.em +++ b/msg/templates/urtps/RtpsTopics.cpp.em @@ -84,6 +84,8 @@ bool RtpsTopics::init(std::condition_variable *t_send_queue_cv, std::mutex *t_se std::cout << "- @(topic) publisher started" << std::endl; @[ if topic == 'Timesync' or topic == 'timesync']@ _timesync->start(&_@(topic)_pub); +@[ elif topic == 'TimesyncStatus' or topic == 'timesync_status']@ + _timesync->init_status_pub(&_@(topic)_pub); @[ end if]@ } else { diff --git a/msg/templates/urtps/Subscriber.cpp.em b/msg/templates/urtps/Subscriber.cpp.em index aba50ce346..69ee1d8138 100644 --- a/msg/templates/urtps/Subscriber.cpp.em +++ b/msg/templates/urtps/Subscriber.cpp.em @@ -188,7 +188,7 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber *sub, Ma { @# Since the time sync runs on the bridge itself, it is required that there is a @# match between two topics of the same entity -@[if topic != 'Timesync' and topic != 'timesync']@ +@[if topic != 'Timesync' and topic != 'timesync' and topic != 'TimesyncStatus' and topic != 'timesync_status']@ // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain // are the same for all its subcomponents (publishers, subscribers) bool is_different_endpoint = false; diff --git a/msg/templates/urtps/microRTPS_timesync.cpp.em b/msg/templates/urtps/microRTPS_timesync.cpp.em index 5d7f902463..7b15cd8bb2 100644 --- a/msg/templates/urtps/microRTPS_timesync.cpp.em +++ b/msg/templates/urtps/microRTPS_timesync.cpp.em @@ -107,6 +107,21 @@ void TimeSync::start(TimesyncPublisher *pub) _send_timesync_thread.reset(new std::thread(run)); } +void TimeSync::init_status_pub(TimesyncStatusPublisher *status_pub) +{ + auto run = [this, status_pub]() { + while (!_request_stop) { + timesync_status_msg_t status_msg = newTimesyncStatusMsg(); + + status_pub->publish(&status_msg); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }; + _request_stop = false; + _send_timesync_status_thread.reset(new std::thread(run)); +} + void TimeSync::stop() { _request_stop = true; @@ -115,6 +130,7 @@ void TimeSync::stop() if (_timesync_node_thread && _timesync_node_thread->joinable()) { _timesync_node_thread->join(); } @[end if]@ if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); } + if (_send_timesync_status_thread && _send_timesync_status_thread->joinable()) { _send_timesync_status_thread->join(); } } void TimeSync::reset() @@ -124,23 +140,23 @@ void TimeSync::reset() } @[if ros2_distro]@ -int64_t TimeSync::getROSTimeNSec() +uint64_t TimeSync::getROSTimeNSec() const { return _timesync_node->now().nanoseconds(); } -int64_t TimeSync::getROSTimeUSec() +uint64_t TimeSync::getROSTimeUSec() const { return RCL_NS_TO_US(getROSTimeNSec()); } @[else]@ -int64_t TimeSync::getSteadyTimeNSec() +uint64_t TimeSync::getSteadyTimeNSec() const { auto time = std::chrono::steady_clock::now(); return std::chrono::time_point_cast(time).time_since_epoch().count(); } -int64_t TimeSync::getSteadyTimeUSec() +uint64_t TimeSync::getSteadyTimeUSec() const { auto time = std::chrono::steady_clock::now(); return std::chrono::time_point_cast(time).time_since_epoch().count(); @@ -149,10 +165,11 @@ int64_t TimeSync::getSteadyTimeUSec() bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) { - int64_t rtti = local_t3_ns - local_t1_ns; + _rtti = local_t3_ns - local_t1_ns; + _remote_time_stamp = remote_t2_ns; // assume rtti is evenly split both directions - int64_t remote_t3_ns = remote_t2_ns + rtti / 2ll; + int64_t remote_t3_ns = remote_t2_ns + _rtti.load() / 2ll; int64_t measurement_offset = remote_t3_ns - local_t3_ns; @@ -181,8 +198,8 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t } // ignore if rtti > 50ms - if (rtti > 50ll * 1000ll * 1000ll) { - if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl; } + if (_rtti.load() > 50ll * 1000ll * 1000ll) { + if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << _rtti.load() / (1000ll * 1000ll) << "ms\033[0m" << std::endl; } return false; } @@ -197,11 +214,11 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t beta = (1. - s) * BETA_INITIAL + s * BETA_FINAL; } - int64_t offset_prev = _offset_ns.load(); + _offset_prev = _offset_ns.load(); updateOffset(static_cast((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + measurement_offset * alpha)); _skew_ns_per_sync = - static_cast(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync); + static_cast(beta * (_offset_ns.load() - _offset_prev.load()) + (1. - beta) * _skew_ns_per_sync); _num_samples++; @@ -263,3 +280,21 @@ timesync_msg_t TimeSync::newTimesyncMsg() return msg; } + +timesync_status_msg_t TimeSync::newTimesyncStatusMsg() +{ + timesync_status_msg_t msg{}; + +@[if ros2_distro]@ + setMsgTimestamp(&msg, getROSTimeUSec()); +@[else]@ + setMsgTimestamp(&msg, getSteadyTimeUSec()); +@[end if]@ + setMsgSourceProtocol(&msg, 1); // SOURCE_PROTOCOL_RTPS + setMsgRemoteTimeStamp(&msg, _remote_time_stamp.load() / 1000ULL); + setMsgObservedOffset(&msg, _offset_prev.load()); + setMsgEstimatedOffset(&msg, _offset_ns.load()); + setMsgRoundTripTime(&msg, _rtti.load() / 1000ll); + + return msg; +} diff --git a/msg/templates/urtps/microRTPS_timesync.h.em b/msg/templates/urtps/microRTPS_timesync.h.em index b029dbc8d3..e272cf67db 100644 --- a/msg/templates/urtps/microRTPS_timesync.h.em +++ b/msg/templates/urtps/microRTPS_timesync.h.em @@ -71,12 +71,14 @@ except AttributeError: @[if ros2_distro]@ #include "Timesync_Publisher.h" +#include "TimesyncStatus_Publisher.h" #include #include #include @[else]@ #include "timesync_Publisher.h" +#include "timesync_status_Publisher.h" @[end if]@ static constexpr double ALPHA_INITIAL = 0.05; @@ -92,21 +94,27 @@ static constexpr int REQUEST_RESET_COUNTER_THRESHOLD = 5; @[if version.parse(fastrtps_version) <= version.parse('1.7.2')]@ @[ if ros2_distro]@ using timesync_msg_t = @(package)::msg::dds_::Timesync_; +using timesync_status_msg_t = @(package)::msg::dds_::TimesyncStatus_; @[ else]@ using timesync_msg_t = timesync_; +using timesync_status_msg_t = timesync_status_; @[ end if]@ @[else]@ @[ if ros2_distro]@ using timesync_msg_t = @(package)::msg::Timesync; +using timesync_status_msg_t = @(package)::msg::TimesyncStatus; @[ else]@ using timesync_msg_t = timesync; +using timesync_status_msg_t = timesync_status; @[ end if]@ @[end if]@ @# Sets the timesync publisher entity depending on using ROS2 or not @[if ros2_distro]@ using TimesyncPublisher = Timesync_Publisher; +using TimesyncStatusPublisher = TimesyncStatus_Publisher; @[else]@ using TimesyncPublisher = timesync_Publisher; +using TimesyncStatusPublisher = timesync_status_Publisher; @[end if]@ class TimeSync @@ -121,6 +129,12 @@ public: */ void start(TimesyncPublisher *pub); + /** + * @@brief Init and run the timesync status publisher thread + * @@param[in] pub The timesync status publisher entity to use + */ + void init_status_pub(TimesyncStatusPublisher *status_pub); + /** * @@brief Resets the filter */ @@ -140,26 +154,26 @@ public: * https://design.ros2.org/articles/clock_and_time.html * @@return ROS time in nanoseconds */ - int64_t getROSTimeNSec(); + uint64_t getROSTimeNSec() const; /** * @@brief Get ROS time in microseconds. Fetches the time from getROSTimeNSec() * and converts it to microseconds * @@return ROS time in microseconds */ - int64_t getROSTimeUSec(); + uint64_t getROSTimeUSec() const; @[else]@ /** * @@brief Get clock monotonic time (raw) in nanoseconds * @@return Steady CLOCK_MONOTONIC time in nanoseconds */ - int64_t getSteadyTimeNSec(); + uint64_t getSteadyTimeNSec() const; /** * @@brief Get clock monotonic time (raw) in microseconds * @@return Steady CLOCK_MONOTONIC time in microseconds */ - int64_t getSteadyTimeUSec(); + uint64_t getSteadyTimeUSec() const; @[end if]@ /** @@ -183,6 +197,12 @@ public: */ timesync_msg_t newTimesyncMsg(); + /** + * @@brief Creates a new timesync status DDS message to be sent from the agent to the client + * @@return A new timesync status message with the origin in the agent and with the agent timestamp + */ + timesync_status_msg_t newTimesyncStatusMsg(); + /** * @@brief Get the time sync offset in nanoseconds * @@return The offset in nanoseconds @@ -203,6 +223,9 @@ public: private: std::atomic _offset_ns; + std::atomic _offset_prev; + std::atomic _remote_time_stamp; + std::atomic _rtti; @[if ros2_distro]@ /** @@ -221,6 +244,7 @@ private: bool _debug; std::unique_ptr _send_timesync_thread; + std::unique_ptr _send_timesync_status_thread; @[if ros2_distro]@ std::unique_ptr _timesync_node_thread; @[end if]@ @@ -247,18 +271,40 @@ private: inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); } @[end if]@ + /** Common timestamp setter **/ +@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ + template + inline void setMsgTimestamp(T *msg, const uint64_t ×tamp) { msg->timestamp_() = timestamp; } +@[elif ros2_distro]@ + template + inline void setMsgTimestamp(T *msg, const uint64_t ×tamp) { msg->timestamp() = timestamp; } +@[end if]@ + /** Timesync msg Setters **/ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ - inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t ×tamp) { msg->timestamp_() = timestamp; } inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; } @[elif ros2_distro]@ - inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t ×tamp) { msg->timestamp() = timestamp; } inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; } @[end if]@ + + /** Timesync Status msg Setters **/ +@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ + inline void setMsgSourceProtocol(timesync_status_msg_t *msg, const uint8_t &source_protocol) { msg->source_protocol_() = source_protocol; } + inline void setMsgRemoteTimeStamp(timesync_status_msg_t *msg, const uint64_t &remote_timestamp) { msg->remote_timestamp_() = remote_timestamp; } + inline void setMsgObservedOffset(timesync_status_msg_t *msg, const int64_t &observed_offset) { msg->observed_offset_() = observed_offset; } + inline void setMsgEstimatedOffset(timesync_status_msg_t *msg, const int64_t &estimated_offset) { msg->estimated_offset_() = estimated_offset; } + inline void setMsgRoundTripTime(timesync_status_msg_t *msg, const uint32_t &round_trip_time) { msg->round_trip_time_() = round_trip_time; } +@[elif ros2_distro]@ + inline void setMsgSourceProtocol(timesync_status_msg_t *msg, const uint8_t &source_protocol) { msg->source_protocol() = source_protocol; } + inline void setMsgRemoteTimeStamp(timesync_status_msg_t *msg, const uint64_t &remote_timestamp) { msg->remote_timestamp() = remote_timestamp; } + inline void setMsgObservedOffset(timesync_status_msg_t *msg, const int64_t &observed_offset) { msg->observed_offset() = observed_offset; } + inline void setMsgEstimatedOffset(timesync_status_msg_t *msg, const int64_t &estimated_offset) { msg->estimated_offset() = estimated_offset; } + inline void setMsgRoundTripTime(timesync_status_msg_t *msg, const uint32_t &round_trip_time) { msg->round_trip_time() = round_trip_time; } +@[end if]@ }; diff --git a/msg/timesync_status.msg b/msg/timesync_status.msg index df0a56ca8a..9d557352dc 100644 --- a/msg/timesync_status.msg +++ b/msg/timesync_status.msg @@ -4,7 +4,7 @@ uint8 SOURCE_PROTOCOL_MAVLINK = 0 uint8 SOURCE_PROTOCOL_RTPS = 1 uint8 source_protocol # timesync source. Source can be MAVLink or the microRTPS bridge -uint64 remote_timestamp # remote system timestamp (microseconds) +uint64 remote_timestamp # remote system timestamp (microseconds) int64 observed_offset # raw time offset directly observed from this timesync packet (microseconds) int64 estimated_offset # smoothed time offset between companion system and PX4 (microseconds) uint32 round_trip_time # round trip time of this timesync packet (microseconds) diff --git a/msg/tools/uorb_rtps_message_ids.yaml b/msg/tools/uorb_rtps_message_ids.yaml index 44c4fe8f10..b698b7d426 100644 --- a/msg/tools/uorb_rtps_message_ids.yaml +++ b/msg/tools/uorb_rtps_message_ids.yaml @@ -279,6 +279,7 @@ rtps: receive: true - msg: timesync_status id: 128 + send: true - msg: orb_test id: 129 - msg: orb_test_medium