You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
7.4 KiB

@###############################################
@#
@# EmPy template for generating microRTPS_client.cpp file
@#
@###############################################
@# Start of Template
@#
@# Context:
@# - msgs (List) list of all msg files
@# - multi_topics (List) list of all multi-topic names
@# - ids (List) list of all RTPS msg ids
@###############################################
@{
import os
import genmsg.msgs
from px_generate_uorb_topic_helper import * # this is in Tools/
from px_generate_uorb_topic_files import MsgScope # this is in Tools/
topic_names = [s.short_name for s in spec]
send_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND]
send_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND]
recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE]
receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE]
}@
/****************************************************************************
*
* Copyright (c) 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
#include "microRTPS_transport.h"
#include "microRTPS_client.h"
#include <inttypes.h>
#include <cstdio>
#include <ctime>
#include <pthread.h>
#include <ucdr/microcdr.h>
#include <px4_time.h>
#include <uORB/uORB.h>
#include <uORB/Publication.hpp>
#include <uORB/Subscription.hpp>
@[for topic in list(set(topic_names))]@
#include <uORB/topics/@(topic).h>
#include <uORB_microcdr/topics/@(topic).h>
@[end for]@
void* send(void *data);
@[if send_topics]@
void* send(void* /*unused*/)
{
char data_buffer[BUFFER_SIZE] = {};
uint64_t sent = 0, total_sent = 0;
int loop = 0, read = 0;
uint32_t length = 0;
size_t header_length = 0;
/* subscribe to topics */
@[for idx, topic in enumerate(send_topics)]@
uORB::Subscription @(topic)_sub{ORB_ID(@(topic))};
@[end for]@
// ucdrBuffer to serialize using the user defined buffer
ucdrBuffer writer;
header_length = transport_node->get_header_length();
ucdr_init_buffer(&writer, reinterpret_cast<uint8_t*>(&data_buffer[header_length]), BUFFER_SIZE - header_length);
struct timespec begin;
px4_clock_gettime(CLOCK_REALTIME, &begin);
while (!_should_exit_task)
{
@[for idx, topic in enumerate(send_topics)]@
@(send_base_types[idx])_s @(topic)_data;
if (@(topic)_sub.update(&@(topic)_data)) {
// copy raw data into local buffer. Payload is shifted by header length to make room for header
serialize_@(send_base_types[idx])(&writer, &@(topic)_data, &data_buffer[header_length], &length);
if (0 < (read = transport_node->write(static_cast<char>(@(rtps_message_id(ids, topic))), data_buffer, length)))
{
total_sent += read;
++sent;
}
}
@[end for]@
px4_usleep(_options.sleep_ms * 1000);
++loop;
}
struct timespec end;
px4_clock_gettime(CLOCK_REALTIME, &end);
double elapsed_secs = end.tv_sec - begin.tv_sec + (end.tv_nsec - begin.tv_nsec) / 1e9;
PX4_INFO("SENT: %" PRIu64 " messages in %d LOOPS, %" PRIu64 " bytes in %.03f seconds - %.02fKB/s",
sent, loop, total_sent, elapsed_secs, total_sent / (1e3 * elapsed_secs));
return nullptr;
}
static int launch_send_thread(pthread_t &sender_thread)
{
pthread_attr_t sender_thread_attr;
pthread_attr_init(&sender_thread_attr);
pthread_attr_setstacksize(&sender_thread_attr, PX4_STACK_ADJUSTED(4000));
struct sched_param param;
(void)pthread_attr_getschedparam(&sender_thread_attr, &param);
param.sched_priority = SCHED_PRIORITY_DEFAULT;
(void)pthread_attr_setschedparam(&sender_thread_attr, &param);
pthread_create(&sender_thread, &sender_thread_attr, send, nullptr);
pthread_attr_destroy(&sender_thread_attr);
return 0;
}
@[end if]@
void micrortps_start_topics(struct timespec &begin, uint64_t &total_read, uint64_t &received, int &loop)
{
@[if recv_topics]@
char data_buffer[BUFFER_SIZE] = {};
int read = 0;
uint8_t topic_ID = 255;
// Declare received topics
@[for idx, topic in enumerate(recv_topics)]@
@(receive_base_types[idx])_s @(topic)_data;
uORB::Publication<@(receive_base_types[idx])_s> @(topic)_pub{ORB_ID(@(topic))};
@[end for]@
// ucdrBuffer to deserialize using the user defined buffer
ucdrBuffer reader;
ucdr_init_buffer(&reader, reinterpret_cast<uint8_t*>(data_buffer), BUFFER_SIZE);
@[end if]@
px4_clock_gettime(CLOCK_REALTIME, &begin);
_should_exit_task = false;
@[if send_topics]@
// create a thread for sending data to the simulator
pthread_t sender_thread;
launch_send_thread(sender_thread);
@[end if]@
while (!_should_exit_task)
{
@[if recv_topics]@
while (0 < (read = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE)))
{
total_read += read;
switch (topic_ID)
{
@[for idx, topic in enumerate(recv_topics)]@
case @(rtps_message_id(ids, topic)):
{
deserialize_@(receive_base_types[idx])(&reader, &@(topic)_data, data_buffer);
@(topic)_pub.publish(@(topic)_data);
++received;
}
break;
@[end for]@
default:
PX4_WARN("Unexpected topic ID\n");
break;
}
}
@[end if]@
// loop forever if informed loop number is negative
if (_options.loops >= 0 && loop >= _options.loops) break;
px4_usleep(_options.sleep_ms * 1000);
++loop;
}
@[if send_topics]@
_should_exit_task = true;
pthread_join(sender_thread, nullptr);
@[end if]@
}