@############################################### @# @# EmPy template for generating microRTPS_agent.cpp file @# @############################################### @# Start of Template @# @# Context: @# - spec (msggen.MsgSpec) Parsed specification of the .msg file @# - ros2_distro (List[str]) ROS2 distro name @############################################### @{ import genmsg.msgs from px_generate_uorb_topic_files import MsgScope # this is in Tools/ try: ros2_distro = ros2_distro[0].decode("utf-8") except AttributeError: ros2_distro = ros2_distro[0] send_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.SEND] recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumerate(spec) if scope[idx] == MsgScope.RECEIVE] }@ /**************************************************************************** * * Copyright 2017 Proyectos y Sistemas de Mantenimiento SL (eProsima). * Copyright (c) 2018-2021 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. Neither the name of the copyright holder nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "microRTPS_transport.h" #include "microRTPS_timesync.h" #include "RtpsTopics.h" @[if ros2_distro]@ #include @[end if]@ // Default values #define SLEEP_US 1 #define MAX_SLEEP_US 1000000 #define BAUDRATE 460800 #define MAX_DATA_RATE 10000000 #define DEVICE "/dev/ttyACM0" #define POLL_MS 1 #define MAX_POLL_MS 1000 #define DEFAULT_RECV_PORT 2020 #define DEFAULT_SEND_PORT 2019 #define DEFAULT_IP "127.0.0.1" using namespace eprosima; using namespace eprosima::fastrtps; volatile sig_atomic_t running = 1; std::unique_ptr transport_node; std::unique_ptr topics; uint32_t total_sent = 0, sent = 0; struct options { enum class eTransports { UART, UDP }; eTransports transport = options::eTransports::UART; char device[64] = DEVICE; int sleep_us = SLEEP_US; uint32_t baudrate = BAUDRATE; int poll_ms = POLL_MS; uint16_t recv_port = DEFAULT_RECV_PORT; uint16_t send_port = DEFAULT_SEND_PORT; char ip[16] = DEFAULT_IP; bool sw_flow_control = false; bool hw_flow_control = false; bool verbose_debug = false; std::string ns = ""; } _options; static void usage(const char *name) { printf("usage: %s [options]\n\n" " -b UART device baudrate. Defaults to 460800\n" " -d UART device. Defaults to /dev/ttyACM0\n" " -f Activates UART link SW flow control\n" " -g Activates UART link HW flow control\n" " -i Target remote IP address for UDP. Defaults to 127.0.0.1\n" " -n Topics namespace. Identifies the vehicle in a multi-agent network\n" " -o UART polling timeout in milliseconds. Defaults to 1ms\n" " -r UDP port for receiving (local). Defaults to 2020\n" " -s UDP port for sending (remote). Defaults to 2019\n" " -t [UART|UDP] Defaults to UART\n" " -v Add more verbosity\n" " -w Iteration time for data publishing to the DDS world, in microseconds.\n" " Defaults to 1us\n" " (ROS2 only) Allows to pass arguments to the timesync ROS2 node.\n" " Currently used for setting the usage of simulation time by the node using\n" " '--ros-args -p use_sim_time:=true'\n", name); } static int parse_options(int argc, char **argv) { static const struct option options[] = { {"baudrate", required_argument, NULL, 'b'}, {"device", required_argument, NULL, 'd'}, {"sw-flow-control", no_argument, NULL, 'f'}, {"hw-flow-control", no_argument, NULL, 'g'}, {"ip-address", required_argument, NULL, 'i'}, {"namespace", required_argument, NULL, 'n'}, {"poll-ms", required_argument, NULL, 'o'}, {"reception-port", required_argument, NULL, 'r'}, {"sending-port", required_argument, NULL, 's'}, {"transport", required_argument, NULL, 't'}, {"increase-verbosity", no_argument, NULL, 'v'}, {"sleep-time-us", required_argument, NULL, 'w'}, {"ros-args", required_argument, NULL, 0}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0}}; int ch; while ((ch = getopt_long(argc, argv, "t:d:w:b:o:r:s:i:fghvn:", options, nullptr)) >= 0) { switch (ch) { case 't': _options.transport = strcmp(optarg, "UDP") == 0 ? options::eTransports::UDP : options::eTransports::UART; break; case 'd': if (nullptr != optarg) strcpy(_options.device, optarg); break; case 'w': _options.sleep_us = strtoul(optarg, nullptr, 10); break; case 'b': _options.baudrate = strtoul(optarg, nullptr, 10); break; case 'o': _options.poll_ms = strtol(optarg, nullptr, 10); break; case 'r': _options.recv_port = strtoul(optarg, nullptr, 10); break; case 's': _options.send_port = strtoul(optarg, nullptr, 10); break; case 'i': if (nullptr != optarg) strcpy(_options.ip, optarg); break; case 'f': _options.sw_flow_control = true; break; case 'g': _options.hw_flow_control = true; break; case 'h': usage(argv[0]); exit(0); break; case 'v': _options.verbose_debug = true; break; case 'n': if (nullptr != optarg) _options.ns = std::string(optarg) + "/"; break; default: @[if ros2_distro]@ break; @[else]@ usage(argv[0]); return -1; @[end if]@ } } if (_options.poll_ms < POLL_MS) { _options.poll_ms = POLL_MS; printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too low. Using %d ms instead\033[0m\n", POLL_MS); } else if (_options.poll_ms > MAX_POLL_MS) { _options.poll_ms = MAX_POLL_MS; printf("\033[1;33m[ micrortps_agent ]\tPoll timeout too high. Using %d ms instead\033[0m\n", MAX_POLL_MS); } if (_options.sleep_us > MAX_SLEEP_US) { _options.sleep_us = MAX_SLEEP_US; printf("\033[1;33m[ micrortps_agent ]\tPublishing iteration cycle too slow. Using %d us instead\033[0m\n", MAX_SLEEP_US); } if (_options.hw_flow_control && _options.sw_flow_control) { printf("\033[0;31m[ micrortps_agent ]\tHW and SW flow control set. Please set only one or another\033[0m\n"); return -1; } return 0; } @[if recv_topics]@ std::atomic exit_sender_thread(false); std::condition_variable t_send_queue_cv; std::mutex t_send_queue_mutex; std::queue t_send_queue; void t_send(void *) { char data_buffer[BUFFER_SIZE] = {}; uint32_t length = 0; uint8_t topic_ID = 255; while (running && !exit_sender_thread) { std::unique_lock lk(t_send_queue_mutex); while (t_send_queue.empty() && !exit_sender_thread) { t_send_queue_cv.wait(lk); } topic_ID = t_send_queue.front(); t_send_queue.pop(); lk.unlock(); size_t header_length = transport_node->get_header_length(); /* make room for the header to fill in later */ eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer) - header_length); eprosima::fastcdr::Cdr scdr(cdrbuffer); if (!exit_sender_thread) { if (topics->getMsg(topic_ID, scdr)) { length = scdr.getSerializedDataLength(); if (0 < (length = transport_node->write(topic_ID, data_buffer, length))) { total_sent += length; ++sent; } } } } } @[end if]@ void signal_handler(int signum) { printf("\n\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum); running = 0; transport_node->close(); } int main(int argc, char **argv) { printf("\033[0;37m--- MicroRTPS Agent ---\033[0m\n"); if (-1 == parse_options(argc, argv)) { printf("\033[1;33m[ micrortps_agent ]\tEXITING...\033[0m\n"); return -1; } topics = std::make_unique(); @[if ros2_distro]@ // Initialize communications via the rmw implementation and set up a global signal handler. rclcpp::init(argc, argv, rclcpp::InitOptions()); @[end if]@ // register signal SIGINT and signal handler signal(SIGINT, signal_handler); printf("[ micrortps_agent ]\tStarting link...\n"); const char* localhost_only = std::getenv("ROS_LOCALHOST_ONLY"); if (localhost_only && strcmp(localhost_only, "1") == 0) { printf("[ micrortps_agent ]\tUsing only the localhost network...\n"); } /** * Set the system ID to Mission Computer, in order to identify the agent side * * Note: theoretically a multi-agent system is possible, but this would require * adjustments in the way the timesync is done (would have to create a timesync * instance per agent). Keeping it contained for a 1:1 link for now is reasonable. */ const uint8_t sys_id = static_cast(MicroRtps::System::MISSION_COMPUTER); switch (_options.transport) { case options::eTransports::UART: { transport_node = std::make_unique(_options.device, _options.baudrate, _options.poll_ms, _options.sw_flow_control, _options.hw_flow_control, sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUART transport: device: %s; baudrate: %d; poll: %dms; flow_control: %s\n", _options.device, _options.baudrate, _options.poll_ms, _options.sw_flow_control ? "SW enabled" : (_options.hw_flow_control ? "HW enabled" : "No")); } break; case options::eTransports::UDP: { transport_node = std::make_unique(_options.ip, _options.recv_port, _options.send_port, sys_id, _options.verbose_debug); printf("[ micrortps_agent ]\tUDP transport: ip address: %s; recv port: %u; send port: %u\n", _options.ip, _options.recv_port, _options.send_port); } break; default: printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); return -1; } if (0 > transport_node->init()) { printf("\033[0;37m[ micrortps_agent ]\tEXITING...\033[0m\n"); return -1; } sleep(1); @[if send_topics]@ char data_buffer[BUFFER_SIZE] = {}; int received = 0, loop = 0; int length = 0, total_read = 0; bool receiving = false; uint8_t topic_ID = 255; std::chrono::time_point start, end; @[end if]@ // Init timesync topics->set_timesync(std::make_shared(_options.verbose_debug)); @[if recv_topics]@ topics->init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns); @[end if]@ running = true; @[if recv_topics]@ std::thread sender_thread(t_send, nullptr); @[end if]@ while (running) { @[if send_topics]@ ++loop; if (!receiving) { start = std::chrono::steady_clock::now(); } // Publish messages received from UART if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) { topics->publish(topic_ID, data_buffer, sizeof(data_buffer)); ++received; total_read += length; receiving = true; end = std::chrono::steady_clock::now(); } @[else]@ usleep(_options.sleep_us); @[end if]@ } @[if recv_topics]@ exit_sender_thread = true; t_send_queue_cv.notify_one(); sender_thread.join(); std::chrono::duration elapsed_secs = end - start; if (received > 0) { printf("[ micrortps_agent ]\tRECEIVED: %d messages - %d bytes; %d LOOPS - %.03f seconds - %.02fKB/s\n", received, total_read, loop, elapsed_secs.count(), static_cast(total_read) / (1000 * elapsed_secs.count())); } @[end if]@ @[if recv_topics]@ if (sent > 0) { printf("[ micrortps_agent ]\tSENT: %lu messages - %lu bytes\n", static_cast(sent), static_cast(total_sent)); } @[end if]@ @[if ros2_distro]@ rclcpp::shutdown(); @[end if]@ transport_node.reset(); topics.reset(); return 0; }