Browse Source

microRTPS: agent: publish timesync status

master
TSC21 4 years ago committed by Nuno Marques
parent
commit
d31b7feb31
  1. 4
      msg/templates/uorb_microcdr/microRTPS_client.cpp.em
  2. 2
      msg/templates/urtps/RtpsTopics.cpp.em
  3. 2
      msg/templates/urtps/Subscriber.cpp.em
  4. 55
      msg/templates/urtps/microRTPS_timesync.cpp.em
  5. 58
      msg/templates/urtps/microRTPS_timesync.h.em
  6. 2
      msg/timesync_status.msg
  7. 1
      msg/tools/uorb_rtps_message_ids.yaml

4
msg/templates/uorb_microcdr/microRTPS_client.cpp.em

@ -69,7 +69,7 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] =
#include <px4_time.h> #include <px4_time.h>
#include <uORB/uORB.h> #include <uORB/uORB.h>
#include <uORB/Publication.hpp> #include <uORB/PublicationMulti.hpp>
#include <uORB/Subscription.hpp> #include <uORB/Subscription.hpp>
@[for topic in list(set(topic_names))]@ @[for topic in list(set(topic_names))]@
#include <uORB/topics/@(topic).h> #include <uORB/topics/@(topic).h>
@ -82,7 +82,7 @@ using namespace time_literals;
// Publishers for received messages // Publishers for received messages
struct RcvTopicsPubs { struct RcvTopicsPubs {
@[ for idx, topic in enumerate(recv_topics)]@ @[ for idx, topic in enumerate(recv_topics)]@
uORB::Publication <@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))}; uORB::PublicationMulti <@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))};
@[ end for]@ @[ end for]@
}; };
@[end if]@ @[end if]@

2
msg/templates/urtps/RtpsTopics.cpp.em

@ -84,6 +84,8 @@ bool RtpsTopics::init(std::condition_variable *t_send_queue_cv, std::mutex *t_se
std::cout << "- @(topic) publisher started" << std::endl; std::cout << "- @(topic) publisher started" << std::endl;
@[ if topic == 'Timesync' or topic == 'timesync']@ @[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->start(&_@(topic)_pub); _timesync->start(&_@(topic)_pub);
@[ elif topic == 'TimesyncStatus' or topic == 'timesync_status']@
_timesync->init_status_pub(&_@(topic)_pub);
@[ end if]@ @[ end if]@
} else { } else {

2
msg/templates/urtps/Subscriber.cpp.em

@ -188,7 +188,7 @@ void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber *sub, Ma
{ {
@# Since the time sync runs on the bridge itself, it is required that there is a @# Since the time sync runs on the bridge itself, it is required that there is a
@# match between two topics of the same entity @# match between two topics of the same entity
@[if topic != 'Timesync' and topic != 'timesync']@ @[if topic != 'Timesync' and topic != 'timesync' and topic != 'TimesyncStatus' and topic != 'timesync_status']@
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain // The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers) // are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false; bool is_different_endpoint = false;

55
msg/templates/urtps/microRTPS_timesync.cpp.em

@ -107,6 +107,21 @@ void TimeSync::start(TimesyncPublisher *pub)
_send_timesync_thread.reset(new std::thread(run)); _send_timesync_thread.reset(new std::thread(run));
} }
void TimeSync::init_status_pub(TimesyncStatusPublisher *status_pub)
{
auto run = [this, status_pub]() {
while (!_request_stop) {
timesync_status_msg_t status_msg = newTimesyncStatusMsg();
status_pub->publish(&status_msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
};
_request_stop = false;
_send_timesync_status_thread.reset(new std::thread(run));
}
void TimeSync::stop() void TimeSync::stop()
{ {
_request_stop = true; _request_stop = true;
@ -115,6 +130,7 @@ void TimeSync::stop()
if (_timesync_node_thread && _timesync_node_thread->joinable()) { _timesync_node_thread->join(); } if (_timesync_node_thread && _timesync_node_thread->joinable()) { _timesync_node_thread->join(); }
@[end if]@ @[end if]@
if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); } if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); }
if (_send_timesync_status_thread && _send_timesync_status_thread->joinable()) { _send_timesync_status_thread->join(); }
} }
void TimeSync::reset() void TimeSync::reset()
@ -124,23 +140,23 @@ void TimeSync::reset()
} }
@[if ros2_distro]@ @[if ros2_distro]@
int64_t TimeSync::getROSTimeNSec() uint64_t TimeSync::getROSTimeNSec() const
{ {
return _timesync_node->now().nanoseconds(); return _timesync_node->now().nanoseconds();
} }
int64_t TimeSync::getROSTimeUSec() uint64_t TimeSync::getROSTimeUSec() const
{ {
return RCL_NS_TO_US(getROSTimeNSec()); return RCL_NS_TO_US(getROSTimeNSec());
} }
@[else]@ @[else]@
int64_t TimeSync::getSteadyTimeNSec() uint64_t TimeSync::getSteadyTimeNSec() const
{ {
auto time = std::chrono::steady_clock::now(); auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::nanoseconds>(time).time_since_epoch().count(); return std::chrono::time_point_cast<std::chrono::nanoseconds>(time).time_since_epoch().count();
} }
int64_t TimeSync::getSteadyTimeUSec() uint64_t TimeSync::getSteadyTimeUSec() const
{ {
auto time = std::chrono::steady_clock::now(); auto time = std::chrono::steady_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(time).time_since_epoch().count(); return std::chrono::time_point_cast<std::chrono::microseconds>(time).time_since_epoch().count();
@ -149,10 +165,11 @@ int64_t TimeSync::getSteadyTimeUSec()
bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns) bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns)
{ {
int64_t rtti = local_t3_ns - local_t1_ns; _rtti = local_t3_ns - local_t1_ns;
_remote_time_stamp = remote_t2_ns;
// assume rtti is evenly split both directions // assume rtti is evenly split both directions
int64_t remote_t3_ns = remote_t2_ns + rtti / 2ll; int64_t remote_t3_ns = remote_t2_ns + _rtti.load() / 2ll;
int64_t measurement_offset = remote_t3_ns - local_t3_ns; int64_t measurement_offset = remote_t3_ns - local_t3_ns;
@ -181,8 +198,8 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
} }
// ignore if rtti > 50ms // ignore if rtti > 50ms
if (rtti > 50ll * 1000ll * 1000ll) { if (_rtti.load() > 50ll * 1000ll * 1000ll) {
if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << rtti / (1000ll * 1000ll) << "ms\033[0m" << std::endl; } if (_debug) { std::cout << "\033[1;33m[ micrortps__timesync ]\tRTTI too high for timesync: " << _rtti.load() / (1000ll * 1000ll) << "ms\033[0m" << std::endl; }
return false; return false;
} }
@ -197,11 +214,11 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
beta = (1. - s) * BETA_INITIAL + s * BETA_FINAL; beta = (1. - s) * BETA_INITIAL + s * BETA_FINAL;
} }
int64_t offset_prev = _offset_ns.load(); _offset_prev = _offset_ns.load();
updateOffset(static_cast<int64_t>((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + updateOffset(static_cast<int64_t>((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) +
measurement_offset * alpha)); measurement_offset * alpha));
_skew_ns_per_sync = _skew_ns_per_sync =
static_cast<int64_t>(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync); static_cast<int64_t>(beta * (_offset_ns.load() - _offset_prev.load()) + (1. - beta) * _skew_ns_per_sync);
_num_samples++; _num_samples++;
@ -263,3 +280,21 @@ timesync_msg_t TimeSync::newTimesyncMsg()
return msg; return msg;
} }
timesync_status_msg_t TimeSync::newTimesyncStatusMsg()
{
timesync_status_msg_t msg{};
@[if ros2_distro]@
setMsgTimestamp(&msg, getROSTimeUSec());
@[else]@
setMsgTimestamp(&msg, getSteadyTimeUSec());
@[end if]@
setMsgSourceProtocol(&msg, 1); // SOURCE_PROTOCOL_RTPS
setMsgRemoteTimeStamp(&msg, _remote_time_stamp.load() / 1000ULL);
setMsgObservedOffset(&msg, _offset_prev.load());
setMsgEstimatedOffset(&msg, _offset_ns.load());
setMsgRoundTripTime(&msg, _rtti.load() / 1000ll);
return msg;
}

58
msg/templates/urtps/microRTPS_timesync.h.em

@ -71,12 +71,14 @@ except AttributeError:
@[if ros2_distro]@ @[if ros2_distro]@
#include "Timesync_Publisher.h" #include "Timesync_Publisher.h"
#include "TimesyncStatus_Publisher.h"
#include <rcl/time.h> #include <rcl/time.h>
#include <rclcpp/clock.hpp> #include <rclcpp/clock.hpp>
#include <rclcpp/rclcpp.hpp> #include <rclcpp/rclcpp.hpp>
@[else]@ @[else]@
#include "timesync_Publisher.h" #include "timesync_Publisher.h"
#include "timesync_status_Publisher.h"
@[end if]@ @[end if]@
static constexpr double ALPHA_INITIAL = 0.05; static constexpr double ALPHA_INITIAL = 0.05;
@ -92,21 +94,27 @@ static constexpr int REQUEST_RESET_COUNTER_THRESHOLD = 5;
@[if version.parse(fastrtps_version) <= version.parse('1.7.2')]@ @[if version.parse(fastrtps_version) <= version.parse('1.7.2')]@
@[ if ros2_distro]@ @[ if ros2_distro]@
using timesync_msg_t = @(package)::msg::dds_::Timesync_; using timesync_msg_t = @(package)::msg::dds_::Timesync_;
using timesync_status_msg_t = @(package)::msg::dds_::TimesyncStatus_;
@[ else]@ @[ else]@
using timesync_msg_t = timesync_; using timesync_msg_t = timesync_;
using timesync_status_msg_t = timesync_status_;
@[ end if]@ @[ end if]@
@[else]@ @[else]@
@[ if ros2_distro]@ @[ if ros2_distro]@
using timesync_msg_t = @(package)::msg::Timesync; using timesync_msg_t = @(package)::msg::Timesync;
using timesync_status_msg_t = @(package)::msg::TimesyncStatus;
@[ else]@ @[ else]@
using timesync_msg_t = timesync; using timesync_msg_t = timesync;
using timesync_status_msg_t = timesync_status;
@[ end if]@ @[ end if]@
@[end if]@ @[end if]@
@# Sets the timesync publisher entity depending on using ROS2 or not @# Sets the timesync publisher entity depending on using ROS2 or not
@[if ros2_distro]@ @[if ros2_distro]@
using TimesyncPublisher = Timesync_Publisher; using TimesyncPublisher = Timesync_Publisher;
using TimesyncStatusPublisher = TimesyncStatus_Publisher;
@[else]@ @[else]@
using TimesyncPublisher = timesync_Publisher; using TimesyncPublisher = timesync_Publisher;
using TimesyncStatusPublisher = timesync_status_Publisher;
@[end if]@ @[end if]@
class TimeSync class TimeSync
@ -121,6 +129,12 @@ public:
*/ */
void start(TimesyncPublisher *pub); void start(TimesyncPublisher *pub);
/**
* @@brief Init and run the timesync status publisher thread
* @@param[in] pub The timesync status publisher entity to use
*/
void init_status_pub(TimesyncStatusPublisher *status_pub);
/** /**
* @@brief Resets the filter * @@brief Resets the filter
*/ */
@ -140,26 +154,26 @@ public:
* https://design.ros2.org/articles/clock_and_time.html * https://design.ros2.org/articles/clock_and_time.html
* @@return ROS time in nanoseconds * @@return ROS time in nanoseconds
*/ */
int64_t getROSTimeNSec(); uint64_t getROSTimeNSec() const;
/** /**
* @@brief Get ROS time in microseconds. Fetches the time from getROSTimeNSec() * @@brief Get ROS time in microseconds. Fetches the time from getROSTimeNSec()
* and converts it to microseconds * and converts it to microseconds
* @@return ROS time in microseconds * @@return ROS time in microseconds
*/ */
int64_t getROSTimeUSec(); uint64_t getROSTimeUSec() const;
@[else]@ @[else]@
/** /**
* @@brief Get clock monotonic time (raw) in nanoseconds * @@brief Get clock monotonic time (raw) in nanoseconds
* @@return Steady CLOCK_MONOTONIC time in nanoseconds * @@return Steady CLOCK_MONOTONIC time in nanoseconds
*/ */
int64_t getSteadyTimeNSec(); uint64_t getSteadyTimeNSec() const;
/** /**
* @@brief Get clock monotonic time (raw) in microseconds * @@brief Get clock monotonic time (raw) in microseconds
* @@return Steady CLOCK_MONOTONIC time in microseconds * @@return Steady CLOCK_MONOTONIC time in microseconds
*/ */
int64_t getSteadyTimeUSec(); uint64_t getSteadyTimeUSec() const;
@[end if]@ @[end if]@
/** /**
@ -183,6 +197,12 @@ public:
*/ */
timesync_msg_t newTimesyncMsg(); timesync_msg_t newTimesyncMsg();
/**
* @@brief Creates a new timesync status DDS message to be sent from the agent to the client
* @@return A new timesync status message with the origin in the agent and with the agent timestamp
*/
timesync_status_msg_t newTimesyncStatusMsg();
/** /**
* @@brief Get the time sync offset in nanoseconds * @@brief Get the time sync offset in nanoseconds
* @@return The offset in nanoseconds * @@return The offset in nanoseconds
@ -203,6 +223,9 @@ public:
private: private:
std::atomic<int64_t> _offset_ns; std::atomic<int64_t> _offset_ns;
std::atomic<int64_t> _offset_prev;
std::atomic<uint64_t> _remote_time_stamp;
std::atomic<uint32_t> _rtti;
@[if ros2_distro]@ @[if ros2_distro]@
/** /**
@ -221,6 +244,7 @@ private:
bool _debug; bool _debug;
std::unique_ptr<std::thread> _send_timesync_thread; std::unique_ptr<std::thread> _send_timesync_thread;
std::unique_ptr<std::thread> _send_timesync_status_thread;
@[if ros2_distro]@ @[if ros2_distro]@
std::unique_ptr<std::thread> _timesync_node_thread; std::unique_ptr<std::thread> _timesync_node_thread;
@[end if]@ @[end if]@
@ -247,18 +271,40 @@ private:
inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); } inline int64_t getMsgTS1(const timesync_msg_t *msg) { return msg->ts1(); }
@[end if]@ @[end if]@
/** Common timestamp setter **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
template <typename T>
inline void setMsgTimestamp(T *msg, const uint64_t &timestamp) { msg->timestamp_() = timestamp; }
@[elif ros2_distro]@
template <typename T>
inline void setMsgTimestamp(T *msg, const uint64_t &timestamp) { msg->timestamp() = timestamp; }
@[end if]@
/** Timesync msg Setters **/ /** Timesync msg Setters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@ @[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t &timestamp) { msg->timestamp_() = timestamp; }
inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; } inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id_() = sys_id; }
inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq_() = seq; }
inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1_() = tc1; }
inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1_() = ts1; }
@[elif ros2_distro]@ @[elif ros2_distro]@
inline void setMsgTimestamp(timesync_msg_t *msg, const uint64_t &timestamp) { msg->timestamp() = timestamp; }
inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; } inline void setMsgSysID(timesync_msg_t *msg, const uint8_t &sys_id) { msg->sys_id() = sys_id; }
inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; } inline void setMsgSeq(timesync_msg_t *msg, const uint8_t &seq) { msg->seq() = seq; }
inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; } inline void setMsgTC1(timesync_msg_t *msg, const int64_t &tc1) { msg->tc1() = tc1; }
inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; } inline void setMsgTS1(timesync_msg_t *msg, const int64_t &ts1) { msg->ts1() = ts1; }
@[end if]@ @[end if]@
/** Timesync Status msg Setters **/
@[if version.parse(fastrtps_version) <= version.parse('1.7.2') or not ros2_distro]@
inline void setMsgSourceProtocol(timesync_status_msg_t *msg, const uint8_t &source_protocol) { msg->source_protocol_() = source_protocol; }
inline void setMsgRemoteTimeStamp(timesync_status_msg_t *msg, const uint64_t &remote_timestamp) { msg->remote_timestamp_() = remote_timestamp; }
inline void setMsgObservedOffset(timesync_status_msg_t *msg, const int64_t &observed_offset) { msg->observed_offset_() = observed_offset; }
inline void setMsgEstimatedOffset(timesync_status_msg_t *msg, const int64_t &estimated_offset) { msg->estimated_offset_() = estimated_offset; }
inline void setMsgRoundTripTime(timesync_status_msg_t *msg, const uint32_t &round_trip_time) { msg->round_trip_time_() = round_trip_time; }
@[elif ros2_distro]@
inline void setMsgSourceProtocol(timesync_status_msg_t *msg, const uint8_t &source_protocol) { msg->source_protocol() = source_protocol; }
inline void setMsgRemoteTimeStamp(timesync_status_msg_t *msg, const uint64_t &remote_timestamp) { msg->remote_timestamp() = remote_timestamp; }
inline void setMsgObservedOffset(timesync_status_msg_t *msg, const int64_t &observed_offset) { msg->observed_offset() = observed_offset; }
inline void setMsgEstimatedOffset(timesync_status_msg_t *msg, const int64_t &estimated_offset) { msg->estimated_offset() = estimated_offset; }
inline void setMsgRoundTripTime(timesync_status_msg_t *msg, const uint32_t &round_trip_time) { msg->round_trip_time() = round_trip_time; }
@[end if]@
}; };

2
msg/timesync_status.msg

@ -4,7 +4,7 @@ uint8 SOURCE_PROTOCOL_MAVLINK = 0
uint8 SOURCE_PROTOCOL_RTPS = 1 uint8 SOURCE_PROTOCOL_RTPS = 1
uint8 source_protocol # timesync source. Source can be MAVLink or the microRTPS bridge uint8 source_protocol # timesync source. Source can be MAVLink or the microRTPS bridge
uint64 remote_timestamp # remote system timestamp (microseconds) uint64 remote_timestamp # remote system timestamp (microseconds)
int64 observed_offset # raw time offset directly observed from this timesync packet (microseconds) int64 observed_offset # raw time offset directly observed from this timesync packet (microseconds)
int64 estimated_offset # smoothed time offset between companion system and PX4 (microseconds) int64 estimated_offset # smoothed time offset between companion system and PX4 (microseconds)
uint32 round_trip_time # round trip time of this timesync packet (microseconds) uint32 round_trip_time # round trip time of this timesync packet (microseconds)

1
msg/tools/uorb_rtps_message_ids.yaml

@ -279,6 +279,7 @@ rtps:
receive: true receive: true
- msg: timesync_status - msg: timesync_status
id: 128 id: 128
send: true
- msg: orb_test - msg: orb_test
id: 129 id: 129
- msg: orb_test_medium - msg: orb_test_medium

Loading…
Cancel
Save