You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

404 lines
14 KiB

@###############################################
@#
@# EmPy template for generating microRTPS_agent.cpp file
@#
@###############################################
@# Start of Template
@#
@# Context:
@# - spec (msggen.MsgSpec) Parsed specification of the .msg file
@# - ros2_distro (List[str]) ROS2 distro name
@###############################################
@{
import genmsg.msgs
from px_generate_uorb_topic_files import MsgScope # this is in Tools/
try:
ros2_distro = ros2_distro[0].decode("utf-8")
except AttributeError:
ros2_distro = ros2_distro[0]
send_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND]
recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE]
}@
/****************************************************************************
*
* Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima).
* Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
#include <thread>
#include <atomic>
#include <getopt.h>
#include <unistd.h>
#include <poll.h>
#include <chrono>
#include <ctime>
#include <csignal>
#include <termios.h>
#include <condition_variable>
#include <queue>
#include <fastcdr/Cdr.h>
#include <fastcdr/FastCdr.h>
#include <fastcdr/exceptions/Exception.h>
#include <fastrtps/Domain.h>
#include "microRTPS_transport.h"
#include "microRTPS_timesync.h"
#include "RtpsTopics.h"
@[if ros2_distro]@
#include <rclcpp/rclcpp.hpp>
@[end if]@
// Default values
#define SLEEP_US 1
#define MAX_SLEEP_US 1000000
#define BAUDRATE 460800
#define MAX_DATA_RATE 10000000
#define DEVICE "/dev/ttyACM0"
#define POLL_MS 1
#define MAX_POLL_MS 1000
#define DEFAULT_RECV_PORT 2020
#define DEFAULT_SEND_PORT 2019
#define DEFAULT_IP "127.0.0.1"
using namespace eprosima;
using namespace eprosima::fastrtps;
volatile sig_atomic_t running = 1;
std::unique_ptr<Transport_node> transport_node;
std::unique_ptr<RtpsTopics> topics;
uint32_t total_sent = 0, sent = 0;
struct options {
enum class eTransports {
UART,
UDP
};
eTransports transport = options::eTransports::UART;
char device[64] = DEVICE;
int sleep_us = SLEEP_US;
uint32_t baudrate = BAUDRATE;
int poll_ms = POLL_MS;
uint16_t recv_port = DEFAULT_RECV_PORT;
uint16_t send_port = DEFAULT_SEND_PORT;
char ip[16] = DEFAULT_IP;
bool sw_flow_control = false;
bool hw_flow_control = false;
bool verbose_debug = false;
std::string ns = "";
} _options;
static void usage(const char *name)
{
printf("usage: %s [options]\n\n"
" -b <baudrate> UART device baudrate. Defaults to 460800\n"
" -d <device> UART device. Defaults to /dev/ttyACM0\n"
" -f <sw-flow-control> Activates UART link SW flow control\n"
" -g <hw-flow-control> Activates UART link HW flow control\n"
" -i <ip-address> Target remote IP address for UDP. Defaults to 127.0.0.1\n"
" -n <namespace> Topics namespace. Identifies the vehicle in a multi-agent network\n"
" -o <poll-ms> UART polling timeout in milliseconds. Defaults to 1ms\n"
" -r <reception-port> UDP port for receiving (local). Defaults to 2020\n"
" -s <sending-port> UDP port for sending (remote). Defaults to 2019\n"
" -t <transport> [UART|UDP] Defaults to UART\n"
" -v <increase-verbosity> Add more verbosity\n"
" -w <sleep-time-us> Iteration time for data publishing to the DDS world, in microseconds.\n"
" Defaults to 1us\n"
" <ros-args> (ROS2 only) Allows to pass arguments to the timesync ROS2 node.\n"
" Currently used for setting the usage of simulation time by the node using\n"
" '--ros-args -p use_sim_time:=true'\n",
name);
}
static int parse_options(int argc, char **argv)
{
static const struct option options[] = {
{"baudrate", required_argument, NULL, 'b'},
{"device", required_argument, NULL, 'd'},
{"sw-flow-control", no_argument, NULL, 'f'},
{"hw-flow-control", no_argument, NULL, 'g'},
{"ip-address", required_argument, NULL, 'i'},
{"namespace", required_argument, NULL, 'n'},
{"poll-ms", required_argument, NULL, 'o'},
{"reception-port", required_argument, NULL, 'r'},
{"sending-port", required_argument, NULL, 's'},
{"transport", required_argument, NULL, 't'},
{"increase-verbosity", no_argument, NULL, 'v'},
{"sleep-time-us", required_argument, NULL, 'w'},
{"ros-args", required_argument, NULL, 0},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}};
int ch;
while ((ch = getopt_long(argc, argv, "t:d:w:b:o:r:s:i:fghvn:", options, nullptr)) >= 0) {
switch (ch) {
case 't': _options.transport = strcmp(optarg, "UDP") == 0 ?
options::eTransports::UDP
: options::eTransports::UART; break;
case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break;
case 'w': _options.sleep_us = strtoul(optarg, nullptr, 10); break;
case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break;
case 'o': _options.poll_ms = strtol(optarg, nullptr, 10); break;
case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break;
case 's': _options.send_port = strtoul(optarg, nullptr, 10); break;
case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break;
case 'f': _options.sw_flow_control = true; break;
case 'g': _options.hw_flow_control = true; break;
case 'h': usage(argv[0]); exit(0); break;
case 'v': _options.verbose_debug = true; break;
case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break;
default:
@[if ros2_distro]@
break;
@[else]@
usage(argv[0]);
return -1;
@[end if]@
}
}
if (_options.poll_ms < POLL_MS) {
_options.poll_ms = POLL_MS;
printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low. Using %d ms instead\033[0m\n", POLL_MS);
} else if (_options.poll_ms > MAX_POLL_MS) {
_options.poll_ms = MAX_POLL_MS;
printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too high. Using %d ms instead\033[0m\n", MAX_POLL_MS);
}
if (_options.sleep_us > MAX_SLEEP_US) {
_options.sleep_us = MAX_SLEEP_US;
printf("\033[1;33m[ micrortps_agent ]\tPublishing iteration cycle too slow. Using %d us instead\033[0m\n", MAX_SLEEP_US);
}
if (_options.hw_flow_control && _options.sw_flow_control) {
printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m\n");
return -1;
}
return 0;
}
@[if recv_topics]@
std::atomic<bool> exit_sender_thread(false);
std::condition_variable t_send_queue_cv;
std::mutex t_send_queue_mutex;
std::queue<uint8_t> t_send_queue;
void t_send(void *)
{
char data_buffer[BUFFER_SIZE] = {};
uint32_t length = 0;
uint8_t topic_ID = 255;
while (running && !exit_sender_thread) {
std::unique_lock<std::mutex> lk(t_send_queue_mutex);
while (t_send_queue.empty() && !exit_sender_thread) {
t_send_queue_cv.wait(lk);
}
topic_ID = t_send_queue.front();
t_send_queue.pop();
lk.unlock();
size_t header_length = transport_node->get_header_length();
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer) - header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (!exit_sender_thread) {
if (topics->getMsg(topic_ID, scdr)) {
length = scdr.getSerializedDataLength();
if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) {
total_sent += length;
++sent;
}
}
}
}
}
@[end if]@
void signal_handler(int signum)
{
printf("\n\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum);
running = 0;
transport_node->close();
}
int main(int argc, char **argv)
{
printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n");
if (-1 == parse_options(argc, argv)) {
printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
topics = std::make_unique<RtpsTopics>();
@[if ros2_distro]@
// Initialize communications via the rmw implementation and set up a global signal handler.
rclcpp::init(argc, argv, rclcpp::InitOptions());
@[end if]@
// register signal SIGINT and signal handler
signal(SIGINT, signal_handler);
printf("[ micrortps_agent ]\tStarting link...\n");
const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY");
if (localhost_only && strcmp(localhost_only, "1") == 0) {
printf("[ micrortps_agent ]\tUsing only the localhost network...\n");
}
/**
* Set the system ID to Mission Computer, in order to identify the agent side
*
* Note: theoretically a multi-agent system is possible, but this would require
* adjustments in the way the timesync is done (would have to create a timesync
* instance per agent). Keeping it contained for a 1:1 link for now is reasonable.
*/
const uint8_t sys_id = static_cast<uint8_t>(MicroRtps::System::MISSION_COMPUTER);
switch (_options.transport) {
case options::eTransports::UART: {
transport_node = std::make_unique<UART_node>(_options.device, _options.baudrate, _options.poll_ms,
_options.sw_flow_control, _options.hw_flow_control, sys_id, _options.verbose_debug);
printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; poll: %dms; flow_control: %s\n",
_options.device, _options.baudrate, _options.poll_ms,
_options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No"));
}
break;
case options::eTransports::UDP: {
transport_node = std::make_unique<UDP_node>(_options.ip, _options.recv_port, _options.send_port,
sys_id, _options.verbose_debug);
printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u\n",
_options.ip, _options.recv_port, _options.send_port);
}
break;
default:
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
if (0 > transport_node->init()) {
printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n");
return -1;
}
sleep(1);
@[if send_topics]@
char data_buffer[BUFFER_SIZE] = {};
int received = 0, loop = 0;
int length = 0, total_read = 0;
bool receiving = false;
uint8_t topic_ID = 255;
std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@
// Init timesync
topics->set_timesync(std::make_shared<TimeSync>(_options.verbose_debug));
@[if recv_topics]@
topics->init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
@[end if]@
running = true;
@[if recv_topics]@
std::thread sender_thread(t_send, nullptr);
@[end if]@
while (running) {
@[if send_topics]@
++loop;
if (!receiving) { start = std::chrono::steady_clock::now(); }
// Publish messages received from UART
if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
topics->publish(topic_ID, data_buffer, sizeof(data_buffer));
++received;
total_read += length;
receiving = true;
end = std::chrono::steady_clock::now();
}
@[else]@
usleep(_options.sleep_us);
@[end if]@
}
@[if recv_topics]@
exit_sender_thread = true;
t_send_queue_cv.notify_one();
sender_thread.join();
std::chrono::duration<double> elapsed_secs = end - start;
if (received > 0) {
printf("[ micrortps_agent ]\tRECEIVED: %d messages - %d bytes; %d LOOPS - %.03f seconds - %.02fKB/s\n",
received, total_read, loop, elapsed_secs.count(), static_cast<double>(total_read) / (1000 * elapsed_secs.count()));
}
@[end if]@
@[if recv_topics]@
if (sent > 0) {
printf("[ micrortps_agent ]\tSENT: %lu messages - %lu bytes\n", static_cast<unsigned long>(sent),
static_cast<unsigned long>(total_sent));
}
@[end if]@
@[if ros2_distro]@
rclcpp::shutdown();
@[end if]@
transport_node.reset();
topics.reset();
return 0;
}