@ -79,6 +79,12 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] =
@@ -79,6 +79,12 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] =
uint8_t last_remote_msg_seq = 0;
uint8_t last_msg_seq = 0;
uint64_t tx_last_sec_read = 0;
uint64_t rx_last_sec_read = 0;
pthread_mutex_t tx_lock;
pthread_mutex_t rx_lock;
@[if recv_topics]@
// Publishers for received messages
struct RcvTopicsPubs {
@ -101,10 +107,12 @@ struct SendTopicsSubs {
@@ -101,10 +107,12 @@ struct SendTopicsSubs {
struct SendThreadArgs {
uint64_t &total_sent;
uint64_t &sent_last_sec;
uint64_t &sent;
int &sent_loop;
SendThreadArgs(uint64_t &total_sent_, uint64_t &sent_, int &sent_loop_)
SendThreadArgs(uint64_t &total_sent_, uint64_t &sent_last_sec_, uint64_t &sent_ , int &sent_loop_)
: total_sent(total_sent_),
sent_last_sec(sent_last_sec_),
sent(sent_),
sent_loop(sent_loop_) {}
};
@ -123,6 +131,14 @@ void *send(void *args)
@@ -123,6 +131,14 @@ void *send(void *args)
header_length = transport_node->get_header_length();
ucdr_init_buffer(&writer, reinterpret_cast<uint8_t *>(&data_buffer[header_length]), BUFFER_SIZE - header_length);
pthread_t tx_per_second_thread;
int rc = pthread_create(&tx_per_second_thread, nullptr, &tx_per_second, (void *)&data->sent_last_sec);
if (rc != 0) {
errno = rc;
PX4_ERR("Could not create tx counter thread (%d)", errno);
return nullptr;
}
while (!_should_exit_task) {
@[ for idx, topic in enumerate(send_topics)]@
{
@ -147,6 +163,9 @@ void *send(void *args)
@@ -147,6 +163,9 @@ void *send(void *args)
if (0 < (read = transport_node->write(static_cast<char>(@(rtps_message_id(ids, topic))), data_buffer, length))) {
data->total_sent += read;
pthread_mutex_lock(&tx_lock);
tx_last_sec_read += read;
pthread_mutex_unlock(&tx_lock);
++data->sent;
}
@ -161,11 +180,36 @@ void *send(void *args)
@@ -161,11 +180,36 @@ void *send(void *args)
++data->sent_loop;
}
pthread_join(tx_per_second_thread, nullptr);
delete subs;
return nullptr;
}
void *tx_per_second(void *sent_last_sec)
{
while (!_should_exit_task) {
pthread_mutex_lock(&tx_lock);
*((uint64_t *) sent_last_sec) = tx_last_sec_read;
tx_last_sec_read = 0;
pthread_mutex_unlock(&tx_lock);
px4_sleep(1);
}
return nullptr;
}
void *rx_per_second(void *rcvd_last_sec)
{
while (!_should_exit_task) {
pthread_mutex_lock(&rx_lock);
*((uint64_t *) rcvd_last_sec) = rx_last_sec_read;
rx_last_sec_read = 0;
pthread_mutex_unlock(&rx_lock);
px4_sleep(1);
}
return nullptr;
}
static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &args)
{
pthread_attr_t sender_thread_attr;
@ -192,8 +236,8 @@ static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &a
@@ -192,8 +236,8 @@ static int launch_send_thread(pthread_t &sender_thread, struct SendThreadArgs &a
}
@[end if]@
void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64_t &total_sent, uint64_t &received ,
uint64_t &sent, int &rcvd_loop, int &sent_loop)
void micrortps_start_topics(struct timespec &begin, uint64_t &total_rcvd, uint64_t &total_sent, uint64_t &sent_last_sec ,
uint64_t &rcvd_last_sec, uint64_t &received, uint64_t & sent, int &rcvd_loop, int &sent_loop)
{
@[if recv_topics]@
char data_buffer[BUFFER_SIZE] = {};
@ -208,13 +252,21 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
@@ -208,13 +252,21 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
// ucdrBuffer to deserialize using the user defined buffer
ucdrBuffer reader;
ucdr_init_buffer(&reader, reinterpret_cast<uint8_t *>(data_buffer), BUFFER_SIZE);
pthread_t rx_per_second_thread;
int rc = pthread_create(&rx_per_second_thread, nullptr, &rx_per_second, (void *)&rcvd_last_sec);
if (rc != 0) {
errno = rc;
PX4_ERR("Could not create rx counter thread (%d)", errno);
return;
}
@[end if]@
px4_clock_gettime(CLOCK_REALTIME, &begin);
_should_exit_task = false;
@[if send_topics]@
// var struct to be updated on the thread
SendThreadArgs *sender_thread_args = new SendThreadArgs(total_sent, sent, sent_loop);
SendThreadArgs *sender_thread_args = new SendThreadArgs(total_sent, sent_last_sec, sent , sent_loop);
// create a thread for sending data
pthread_t sender_thread;
@ -224,7 +276,11 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
@@ -224,7 +276,11 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
while (!_should_exit_task) {
@[if recv_topics]@
while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
total_read += read;
total_rcvd += read;
pthread_mutex_lock(&rx_lock);
rcvd_last_sec += read;
pthread_mutex_unlock(&rx_lock);
uint64_t read_time = hrt_absolute_time();
switch (topic_ID) {
@ -258,12 +314,13 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
@@ -258,12 +314,13 @@ void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64
++rcvd_loop;
}
@[if recv_topics]@
delete pubs;
@[end if]@
@[if send_topics]@
delete sender_thread_args;
_should_exit_task = true;
pthread_join(sender_thread, nullptr);
@[end if]@
@[if recv_topics]@
pthread_join(rx_per_second_thread, nullptr);
delete pubs;
@[end if]@
}