diff --git a/platforms/posix/src/main.cpp b/platforms/posix/src/main.cpp index 91739014cb..7cf1cae21a 100644 --- a/platforms/posix/src/main.cpp +++ b/platforms/posix/src/main.cpp @@ -173,8 +173,6 @@ int main(int argc, char **argv) argv[0] += path_length + strlen(prefix); px4_daemon::Client client(instance); - client.generate_uuid(); - client.register_sig_handler(); return client.process_args(argc, (const char **)argv); } else { diff --git a/platforms/posix/src/px4_daemon/CMakeLists.txt b/platforms/posix/src/px4_daemon/CMakeLists.txt index 9704af02c1..e6cf9fcc75 100644 --- a/platforms/posix/src/px4_daemon/CMakeLists.txt +++ b/platforms/posix/src/px4_daemon/CMakeLists.txt @@ -37,6 +37,6 @@ px4_add_library(px4_daemon client.cpp server.cpp server_io.cpp - pipe_protocol.cpp + sock_protocol.cpp ) diff --git a/platforms/posix/src/px4_daemon/client.cpp b/platforms/posix/src/px4_daemon/client.cpp index 5f76cf1a6b..194875aabe 100644 --- a/platforms/posix/src/px4_daemon/client.cpp +++ b/platforms/posix/src/px4_daemon/client.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -35,13 +35,15 @@ * * @author Julian Oes <julian@oes.ch> * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ #include <errno.h> #include <stdio.h> #include <fcntl.h> -#include <signal.h> +#include <string.h> #include <sys/types.h> +#include <sys/socket.h> #include <sys/stat.h> #include <sys/un.h> @@ -53,60 +55,33 @@ namespace px4_daemon { - -namespace client -{ -static Client *_instance; -} - Client::Client(int instance_id) : - _uuid(0), - _client_send_pipe_fd(-1), + _fd(-1), _instance_id(instance_id) -{ - client::_instance = this; -} - -Client::~Client() -{ - client::_instance = nullptr; -} +{} int -Client::generate_uuid() +Client::process_args(const int argc, const char **argv) { - int rand_fd = open("/dev/urandom", O_RDONLY); - - if (rand_fd < 0) { - PX4_ERR("open urandom"); - return rand_fd; - } - - int ret = 0; + std::string sock_path = get_socket_path(_instance_id); - int rand_read = read(rand_fd, &_uuid, sizeof(_uuid)); + _fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (rand_read != sizeof(_uuid)) { - PX4_ERR("rand read fail"); - ret = -errno; + if (_fd < 0) { + PX4_ERR("error creating socket"); + return -1; } - close(rand_fd); - return ret; -} - -int -Client::process_args(const int argc, const char **argv) -{ - // Prepare return pipe first to avoid a race. - int ret = _prepare_recv_pipe(); + sockaddr_un addr = {}; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path) - 1); - if (ret != 0) { - PX4_ERR("Could not prepare recv pipe"); - return -2; + if (connect(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) { + PX4_ERR("error connecting to socket"); + return -1; } - ret = _send_cmds(argc, argv); + int ret = _send_cmds(argc, argv); if (ret != 0) { PX4_ERR("Could not send commands"); @@ -116,70 +91,43 @@ Client::process_args(const int argc, const char **argv) return _listen(); } - -int -Client::_prepare_recv_pipe() -{ - int ret = get_client_recv_pipe_path(_uuid, _recv_pipe_path, sizeof(_recv_pipe_path)); - - if (ret < 0) { - PX4_ERR("failed to assemble path"); - return ret; - } - - ret = mkfifo(_recv_pipe_path, 0666); - - if (ret < 0) { - PX4_ERR("pipe %s already exists, errno: %d, %s", _recv_pipe_path, errno, strerror(errno)); - return ret; - } - - return 0; -} - int Client::_send_cmds(const int argc, const char **argv) { - // Send the command to server. - client_send_packet_s packet; - packet.header.msg_id = client_send_packet_s::message_header_s::e_msg_id::EXECUTE; - packet.header.client_uuid = _uuid; - packet.payload.execute_msg.is_atty = isatty(STDOUT_FILENO); - - // Concat arguments to send them. std::string cmd_buf; for (int i = 0; i < argc; ++i) { cmd_buf += argv[i]; if (i + 1 != argc) { + // TODO: Use '\0' as argument separator (and parse this server-side as well), + // so (quoted) whitespace within arguments doesn't get lost. cmd_buf += " "; } } - if (cmd_buf.size() >= sizeof(packet.payload.execute_msg.cmd)) { - PX4_ERR("commmand too long"); - return -1; - } + // Last byte is 'isatty'. + cmd_buf.push_back(isatty(STDOUT_FILENO)); - strcpy((char *)packet.payload.execute_msg.cmd, cmd_buf.c_str()); + size_t n = cmd_buf.size(); + const char *buf = cmd_buf.data(); - // The size is +1 because we want to include the null termination. - packet.header.payload_length = cmd_buf.size() + 1 + sizeof(packet.payload.execute_msg.is_atty); + while (n > 0) { + int n_sent = write(_fd, buf, n); - _client_send_pipe_fd = open(get_client_send_pipe_path(_instance_id).c_str(), O_WRONLY); + if (n_sent < 0) { + PX4_ERR("write() failed: %s", strerror(errno)); + return -1; + } - if (_client_send_pipe_fd < 0) { - PX4_ERR("pipe open fail (%i)", errno); - return _client_send_pipe_fd; + n -= n_sent; + buf += n_sent; } - int bytes_to_send = get_client_send_packet_length(&packet); - int bytes_sent = write(_client_send_pipe_fd, &packet, bytes_to_send); - - if (bytes_sent != bytes_to_send) { - PX4_ERR("write fail (%i)", errno); - return bytes_sent; + // Let the server know we're done writing. + if (shutdown(_fd, SHUT_WR) < 0) { + PX4_ERR("shutdown() failed: %s", strerror(errno)); + return -1; } return 0; @@ -188,159 +136,57 @@ Client::_send_cmds(const int argc, const char **argv) int Client::_listen() { - int client_recv_pipe_fd = open(_recv_pipe_path, O_RDONLY); - - if (client_recv_pipe_fd < 0) { - PX4_ERR("open failed, errno: %d, %s", errno, strerror(errno)); - } - - bool exit_loop = false; - int exit_arg = 0; - - while (!exit_loop) { - - // We only read as much as we need, otherwise we might get out of - // sync with packets. - client_recv_packet_s packet_recv; - int bytes_read = read(client_recv_pipe_fd, &packet_recv, sizeof(client_recv_packet_s::header)); - - if (bytes_read > 0) { - - // Using the header we can determine how big the payload is. - int payload_to_read = sizeof(packet_recv) - - sizeof(packet_recv.header) - - sizeof(packet_recv.payload) - + packet_recv.header.payload_length; - - // Again, we only read as much as we need because otherwise we need - // hold a buffer and parse it. - bytes_read = read(client_recv_pipe_fd, ((uint8_t *)&packet_recv) + bytes_read, payload_to_read); - - if (bytes_read > 0) { - - int retval = 0; - bool should_exit = false; - - int parse_ret = _parse_client_recv_packet(packet_recv, retval, should_exit); - - if (parse_ret != 0) { - PX4_ERR("retval could not be parsed"); - exit_arg = -1; - - } else { - exit_arg = retval; - } - - exit_loop = should_exit; - - } else if (bytes_read == 0) { - exit_arg = 0; - exit_loop = true; + char buffer[1024]; + int n_buffer_used = 0; + + // The response ends in {0, retval}. So when we detect a 0, or a 0 followed + // by another byte, we don't output it yet, until we know whether it was + // the end of the stream or not. + while (true) { + int n_read = read(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used); + + if (n_read < 0) { + PX4_ERR("unable to read from socket"); + return -1; + + } else if (n_read == 0) { + if (n_buffer_used == 2) { + return buffer[1]; + + } else { + // Missing return value at end of stream. Stream was abruptly ended. + return -1; } - } else if (bytes_read == 0) { - // 0 means the pipe has been closed by all clients. - exit_arg = 0; - exit_loop = true; + } else { + n_read += n_buffer_used; + + if (n_read >= 2 && buffer[n_read - 2] == 0) { + // If the buffer ends in {0, retval}, keep it. + fwrite(buffer, n_read - 2, 1, stdout); + buffer[0] = 0; + buffer[1] = buffer[n_read - 1]; + n_buffer_used = 2; + + } else if (n_read >= 1 && buffer[n_read - 1] == 0) { + // If the buffer ends in a 0-byte, keep it. + fwrite(buffer, n_read - 1, 1, stdout); + buffer[0] = 0; + n_buffer_used = 1; + + } else { + fwrite(buffer, n_read, 1, stdout); + n_buffer_used = 0; + } } } - - close(_client_send_pipe_fd); - return exit_arg; -} - -int -Client::_parse_client_recv_packet(const client_recv_packet_s &packet, int &retval, bool &should_exit) -{ - switch (packet.header.msg_id) { - case client_recv_packet_s::message_header_s::e_msg_id::RETVAL: - - should_exit = true; - return _retval_cmd_packet(packet, retval); - - case client_recv_packet_s::message_header_s::e_msg_id::STDOUT: - - should_exit = false; - return _stdout_msg_packet(packet); - - default: - should_exit = true; - PX4_ERR("recv msg_id not handled: %d", (int)packet.header.msg_id); - return -1; - } } -int -Client::_retval_cmd_packet(const client_recv_packet_s &packet, int &retval) -{ - if (packet.header.payload_length == sizeof(packet.payload.retval_msg.retval)) { - retval = packet.payload.retval_msg.retval; - return 0; - - } else { - PX4_ERR("payload size wrong"); - return -1; - } -} - -int -Client::_stdout_msg_packet(const client_recv_packet_s &packet) -{ - if (packet.header.payload_length <= sizeof(packet.payload.stdout_msg.text)) { - - printf("%s", packet.payload.stdout_msg.text); - - return 0; - - } else { - PX4_ERR("payload size wrong (%i > %zu)", packet.header.payload_length, sizeof(packet.payload.stdout_msg.text)); - return -1; - } -} - -void -Client::register_sig_handler() -{ - // Register handlers for Ctrl+C to kill the thread if something hangs. - struct sigaction sig_int {}; - sig_int.sa_handler = Client::_static_sig_handler; - - // Without the flag SA_RESTART, we can't use open() after Ctrl+C has - // been pressed, and we can't wait for the return value from the - // cancelled command. - sig_int.sa_flags = SA_RESTART; - sigaction(SIGINT, &sig_int, nullptr); - sigaction(SIGTERM, &sig_int, nullptr); -} - -void -Client::_static_sig_handler(int sig_num) -{ - client::_instance->_sig_handler(sig_num); -} - -void -Client::_sig_handler(int sig_num) +Client::~Client() { - client_send_packet_s packet; - packet.header.msg_id = client_send_packet_s::message_header_s::e_msg_id::KILL; - packet.header.client_uuid = _uuid; - packet.payload.kill_msg.cmd_id = sig_num; - packet.header.payload_length = sizeof(packet.payload.kill_msg.cmd_id); - - if (_client_send_pipe_fd < 0) { - PX4_ERR("pipe open fail"); - system_exit(-1); - } - - int bytes_to_send = get_client_send_packet_length(&packet); - int bytes_sent = write(_client_send_pipe_fd, &packet, bytes_to_send); - - if (bytes_sent != bytes_to_send) { - PX4_ERR("write fail"); - system_exit(-1); + if (_fd >= 0) { + close(_fd); } } } // namespace px4_daemon - diff --git a/platforms/posix/src/px4_daemon/client.h b/platforms/posix/src/px4_daemon/client.h index 2e454c4cc2..41f4cb8381 100644 --- a/platforms/posix/src/px4_daemon/client.h +++ b/platforms/posix/src/px4_daemon/client.h @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -33,21 +33,22 @@ /** * @file client.h * - * The client can write a command to the pipe that is supplied by the server. - * It will then open another pipe with its own UUID encoded and listen for - * stdout of the process that it started and the return value. + * The client can connect and write a command to the socket that is supplied by + * the server. It will then close its half of the connection, and read back the + * stdout stream of the process that it started, followed by its return value. * - * It the client receives a signal (e.g. Ctrl+C) it will catch it and send it - * as a message to the server in order to terminate the thread. + * It the client dies, the connection gets closed automatically and the corresponding + * thread in the server gets cancelled. * * @author Julian Oes <julian@oes.ch> * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ #pragma once #include <stdint.h> -#include "pipe_protocol.h" +#include "sock_protocol.h" namespace px4_daemon { @@ -57,19 +58,14 @@ class Client { public: Client(int instance_id = 0); + ~Client(); - /** - * Initialize the unique ID of the client. - * - * @return 0 on success. - */ - int generate_uuid(); - - /** - * Make sure to catch signals in order to forward them to the server. - */ - void register_sig_handler(); + Client(Client &&other) : _fd(other._fd), _instance_id(other._instance_id) + { + // Steal the fd from the moved-from client. + other._fd = -1; + } /** * Process the supplied command line arguments and send them to server. @@ -81,22 +77,10 @@ public: int process_args(const int argc, const char **argv); private: - int _prepare_recv_pipe(); int _send_cmds(const int argc, const char **argv); int _listen(); - int _parse_client_recv_packet(const client_recv_packet_s &packet, int &retval, bool &should_exit); - - int _retval_cmd_packet(const client_recv_packet_s &packet, int &retval); - int _stdout_msg_packet(const client_recv_packet_s &packet); - - - static void _static_sig_handler(int sig_num); - void _sig_handler(int sig_num); - - uint64_t _uuid; - int _client_send_pipe_fd; - char _recv_pipe_path[RECV_PIPE_PATH_LEN]; + int _fd; int _instance_id; ///< instance ID for running multiple instances of the px4 server }; diff --git a/platforms/posix/src/px4_daemon/server.cpp b/platforms/posix/src/px4_daemon/server.cpp index 83fd4a0cae..f9f5b56854 100644 --- a/platforms/posix/src/px4_daemon/server.cpp +++ b/platforms/posix/src/px4_daemon/server.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -35,29 +35,27 @@ * * @author Julian Oes <julian@oes.ch> * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ +#include <errno.h> #include <fcntl.h> #include <unistd.h> +#include <string.h> #include <string> #include <pthread.h> #include <poll.h> #include <sys/stat.h> +#include <sys/socket.h> #include <sys/types.h> +#include <sys/un.h> +#include <vector> #include <px4_log.h> #include "pxh.h" #include "server.h" -// In Cygwin opening and closing the same named pipe multiple times within one process doesn't work POSIX compliant. -// As a workaround we open the client send pipe file in read write mode such that we can keep it open all the time. -#if !defined(__PX4_CYGWIN) -#define CLIENT_SEND_PIPE_OFLAGS O_RDONLY -#else -#define CLIENT_SEND_PIPE_OFLAGS O_RDWR -#endif - namespace px4_daemon { @@ -78,22 +76,37 @@ Server::~Server() int Server::start() { - std::string client_send_pipe_path = get_client_send_pipe_path(_instance_id); + std::string sock_path = get_socket_path(_instance_id); + + // Delete socket in case it exists already. + unlink(sock_path.c_str()); + + _fd = socket(AF_UNIX, SOCK_STREAM, 0); + + if (_fd < 0) { + PX4_ERR("error creating socket"); + return -1; + } - // Delete pipe in case it exists already. - unlink(client_send_pipe_path.c_str()); + sockaddr_un addr = {}; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path) - 1); - // Create new pipe to listen to clients. - // This needs to happen before we return from this method, so that the caller can launch clients. - mkfifo(client_send_pipe_path.c_str(), 0666); + if (bind(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) { + PX4_ERR("error binding socket"); + return -1; + } + + if (listen(_fd, 10) < 0) { + PX4_ERR("error listing to socket"); + return -1; + } if (0 != pthread_create(&_server_main_pthread, nullptr, _server_main_trampoline, - nullptr)) { - + this)) { PX4_ERR("error creating client handler thread"); - return -1; } @@ -101,12 +114,9 @@ Server::start() } void * -Server::_server_main_trampoline(void *arg) +Server::_server_main_trampoline(void *self) { - if (_instance) { - _instance->_server_main(arg); - } - + ((Server *)self)->_server_main(); return nullptr; } @@ -116,9 +126,8 @@ void Server::_pthread_key_destructor(void *arg) } void -Server::_server_main(void *arg) +Server::_server_main() { - // Set thread specific pipe to supplied file descriptor. int ret = pthread_key_create(&_key, _pthread_key_destructor); if (ret != 0) { @@ -126,241 +135,151 @@ Server::_server_main(void *arg) return; } - std::string client_send_pipe_path = get_client_send_pipe_path(_instance_id); - int client_send_pipe_fd = open(client_send_pipe_path.c_str(), CLIENT_SEND_PIPE_OFLAGS); + // The list of file descriptors to watch. + std::vector<pollfd> poll_fds; - while (true) { + // Watch the listening socket for incoming connections. + poll_fds.push_back(pollfd {_fd, POLLIN, 0}); - client_send_packet_s packet; + while (true) { + int n_ready = poll(poll_fds.data(), poll_fds.size(), -1); - // We only read as much as we need, otherwise we might get out of - // sync with packets. - int bytes_read = read(client_send_pipe_fd, &packet, sizeof(client_send_packet_s::header)); + if (n_ready < 0) { + PX4_ERR("poll() failed: %s", strerror(errno)); + return; + } - if (bytes_read > 0) { + _lock(); - // Using the header we can determine how big the payload is. - int payload_to_read = sizeof(packet) - - sizeof(packet.header) - - sizeof(packet.payload) - + packet.header.payload_length; + // Handle any new connections. + if (poll_fds[0].revents & POLLIN) { + --n_ready; + int client = accept(_fd, nullptr, nullptr); - // Again, we only read as much as we need because otherwise we need - // hold a buffer and parse it. - bytes_read = read(client_send_pipe_fd, ((uint8_t *)&packet) + bytes_read, payload_to_read); + if (client == -1) { + PX4_ERR("failed to accept client: %s", strerror(errno)); + _unlock(); + return; + } - if (bytes_read > 0) { + // Start a new thread to handle the client. + pthread_t *thread = &_fd_to_thread[client]; + ret = pthread_create(thread, nullptr, Server::_handle_client, (void *)(intptr_t)client); - _parse_client_send_packet(packet); + if (ret != 0) { + PX4_ERR("could not start pthread (%i)", ret); + _fd_to_thread.erase(client); + close(client); + } else { + // We won't join the thread, so detach to automatically release resources at its end + pthread_detach(*thread); } - } - if (bytes_read == 0) { - // 0 means the pipe has been closed by all clients - // and we need to re-open it. - close(client_send_pipe_fd); - client_send_pipe_fd = open(client_send_pipe_path.c_str(), O_RDONLY); + // Start listening for the client hanging up. + poll_fds.push_back(pollfd {client, POLLHUP, 0}); } - } - close(client_send_pipe_fd); -} + // Handle any closed connections. + for (size_t i = 1; n_ready > 0 && i < poll_fds.size();) { + if (poll_fds[i].revents) { + --n_ready; + auto thread = _fd_to_thread.find(poll_fds[i].fd); -void -Server::_parse_client_send_packet(const client_send_packet_s &packet) -{ - switch (packet.header.msg_id) { - case client_send_packet_s::message_header_s::e_msg_id::EXECUTE: - _execute_cmd_packet(packet); - break; - - case client_send_packet_s::message_header_s::e_msg_id::KILL: - _kill_cmd_packet(packet); - break; - - default: - PX4_ERR("send msg_id not handled"); - break; - } -} + if (thread != _fd_to_thread.end()) { + // Thread is still running, so we cancel it. + // TODO: use a more graceful exit method to avoid resource leaks + pthread_cancel(thread->second); + _fd_to_thread.erase(thread); + } -void -Server::_execute_cmd_packet(const client_send_packet_s &packet) -{ - if (packet.header.payload_length == 0) { - PX4_ERR("command length 0"); - return; - } + close(poll_fds[i].fd); + poll_fds.erase(poll_fds.begin() + i); - // We open the client's specific pipe to write the return value and stdout back to. - // The pipe's path is created knowing the UUID of the client. - char path[RECV_PIPE_PATH_LEN]; - int ret = get_client_recv_pipe_path(packet.header.client_uuid, path, RECV_PIPE_PATH_LEN); - - if (ret < 0) { - PX4_ERR("failed to assemble path"); - return; - } - - int pipe_fd = open(path, O_WRONLY); - - if (pipe_fd < 0) { - PX4_ERR("pipe open fail"); - return; - } - - // To execute a command we start a new thread. - pthread_t new_pthread; - - // We need to copy everything that the new thread needs because we will go - // out of scope. - RunCmdArgs *args = new RunCmdArgs; - strncpy(args->cmd, (char *)packet.payload.execute_msg.cmd, sizeof(args->cmd)); - args->client_uuid = packet.header.client_uuid; - args->pipe_fd = pipe_fd; - args->is_atty = packet.payload.execute_msg.is_atty; - - _lock(); // need to lock, otherwise the thread could already exit before we insert into the map - ret = pthread_create(&new_pthread, nullptr, Server::_run_cmd, (void *)args); + } else { + ++i; + } + } - if (ret != 0) { - PX4_ERR("could not start pthread (%i)", ret); - delete args; - - } else { - // We won't join the thread, so detach to automatically release resources at its end - pthread_detach(new_pthread); - // We keep two maps for cleanup if a thread is finished or killed. - _client_uuid_to_pthread.insert(std::pair<uint64_t, pthread_t> - (packet.header.client_uuid, new_pthread)); - _pthread_to_pipe_fd.insert(std::pair<pthread_t, int> - (new_pthread, pipe_fd)); + _unlock(); } - _unlock(); + close(_fd); } - void -Server::_kill_cmd_packet(const client_send_packet_s &packet) +*Server::_handle_client(void *arg) { - _lock(); + int fd = (int)(intptr_t)arg; - // TODO: we currently ignore the signal type. - auto client_uuid_iter = _client_uuid_to_pthread.find(packet.header.client_uuid); + // Read until the end of the incoming stream. + std::string cmd; - if (client_uuid_iter == _client_uuid_to_pthread.end()) { - _unlock(); - return; - } - - pthread_t pthread_to_kill = client_uuid_iter->second; - - // TODO: use a more graceful exit method to avoid resource leaks - int ret = pthread_cancel(pthread_to_kill); + while (true) { + size_t n = cmd.size(); + cmd.resize(n + 1024); + ssize_t n_read = read(fd, &cmd[n], cmd.size() - n); - __cleanup_thread(packet.header.client_uuid); + if (n_read < 0) { + _cleanup(fd); + return nullptr; + } - _unlock(); + cmd.resize(n + n_read); - if (ret != 0) { - PX4_ERR("failed to cancel thread"); + if (n_read == 0) { + break; + } } - // We don't send retval when we get killed. - // The client knows this and just exits without confirmation. -} - - - -void -*Server::_run_cmd(void *arg) -{ - RunCmdArgs *args = (RunCmdArgs *)arg; - - // Copy arguments so that we can cleanup the arg structure. - uint64_t client_uuid = args->client_uuid; - int pipe_fd = args->pipe_fd; - bool is_atty = args->is_atty; - std::string message_str(args->cmd); + if (cmd.size() < 2) { + _cleanup(fd); + return nullptr; + } - // Clean up the args from the heap in case the thread gets canceled - // from outside. - delete args; + // Last byte is 'isatty'. + uint8_t isatty = cmd.back(); + cmd.pop_back(); // We register thread specific data. This is used for PX4_INFO (etc.) log calls. CmdThreadSpecificData *thread_data_ptr; if ((thread_data_ptr = (CmdThreadSpecificData *)pthread_getspecific(_instance->_key)) == nullptr) { thread_data_ptr = new CmdThreadSpecificData; - thread_data_ptr->pipe_fd = pipe_fd; - thread_data_ptr->is_atty = is_atty; - thread_data_ptr->packet.header.msg_id = client_recv_packet_s::message_header_s::e_msg_id::STDOUT; + thread_data_ptr->fd = fd; + thread_data_ptr->is_atty = isatty; (void)pthread_setspecific(_instance->_key, (void *)thread_data_ptr); } // Run the actual command. - int retval = Pxh::process_line(message_str, true); + int retval = Pxh::process_line(cmd, true); // Report return value. - _send_retval(pipe_fd, retval, client_uuid); - - // Clean up before returning. - _instance->_cleanup_thread(client_uuid); + char buf[2] = {0, (char)retval}; - return nullptr; -} - - -void -Server::_send_retval(const int pipe_fd, const int retval, const uint64_t client_uuid) -{ - client_recv_packet_s packet; - - packet.header.msg_id = client_recv_packet_s::message_header_s::e_msg_id::RETVAL; - packet.header.payload_length = sizeof(packet.payload.retval_msg); - packet.payload.retval_msg.retval = retval; - - int bytes_to_send = get_client_recv_packet_length(&packet); - int bytes_sent = write(pipe_fd, &packet, bytes_to_send); - - if (bytes_sent != bytes_to_send) { - printf("write fail\n"); - return; + if (write(fd, buf, sizeof buf) < 0) { + // Don't care it went wrong, as we're cleaning up anyway. } -} -void -Server::_cleanup_thread(const uint64_t client_uuid) -{ - _lock(); - __cleanup_thread(client_uuid); - _unlock(); + _cleanup(fd); + return nullptr; } void -Server::__cleanup_thread(const uint64_t client_uuid) +Server::_cleanup(int fd) { - pthread_t pthread_killed = _client_uuid_to_pthread[client_uuid]; - auto pipe_iter = _pthread_to_pipe_fd.find(pthread_killed); - - if (pipe_iter == _pthread_to_pipe_fd.end()) { - // can happen if the thread already exited and then got a kill packet - PX4_DEBUG("pipe fd already closed"); - return; - } - - int pipe_fd = pipe_iter->second; - close(pipe_fd); - - char path[RECV_PIPE_PATH_LEN] = {}; - get_client_recv_pipe_path(client_uuid, path, RECV_PIPE_PATH_LEN); - unlink(path); - - _client_uuid_to_pthread.erase(client_uuid); - _pthread_to_pipe_fd.erase(pthread_killed); + _instance->_lock(); + _instance->_fd_to_thread.erase(fd); + _instance->_unlock(); + + // We can't close() the fd here, since the main thread is probably + // polling for it: close()ing it causes a race condition. + // So, we only call shutdown(), which causes the main thread to register a + // 'POLLHUP', such that the main thread can close() it for us. + // We already removed this thread from _fd_to_thread, so there is no risk + // of the main thread trying to cancel this thread after it already exited. + shutdown(fd, SHUT_RDWR); } } //namespace px4_daemon diff --git a/platforms/posix/src/px4_daemon/server.h b/platforms/posix/src/px4_daemon/server.h index e1e0f8b845..1ce8d07675 100644 --- a/platforms/posix/src/px4_daemon/server.h +++ b/platforms/posix/src/px4_daemon/server.h @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -33,12 +33,11 @@ /** * @file server.h * - * The server (also called daemon) opens a pipe for clients to write to. + * The server (also called daemon) opens a socket for clients to connect to. * - * Once a client connects it will send a command as well as a unique ID. + * Once a client connects it will send a command and close its side of the connection. * The server will return the stdout of the executing command, as well as the return - * value to the client on a client specific pipe. - * The client specific pipe are idenified by the unique ID of the client. + * value to the client. * * There should only every be one server running, therefore the static instance. * The Singleton implementation is not complete, but it should be obvious not @@ -46,6 +45,7 @@ * * @author Julian Oes <julian@oes.ch> * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ #pragma once @@ -54,7 +54,7 @@ #include <pthread.h> #include <map> -#include "pipe_protocol.h" +#include "sock_protocol.h" namespace px4_daemon @@ -74,11 +74,10 @@ public: */ int start(); - struct CmdThreadSpecificData { - int pipe_fd; // pipe fd to send data to descriptor + int fd; // fd to send stdout to bool is_atty; // whether file descriptor refers to a terminal - client_recv_packet_s packet; + char buffer[1024]; }; static bool is_running() @@ -92,17 +91,7 @@ public: } private: static void *_server_main_trampoline(void *arg); - void _server_main(void *arg); - - void _parse_client_send_packet(const client_send_packet_s &packet); - void _execute_cmd_packet(const client_send_packet_s &packet); - void _kill_cmd_packet(const client_send_packet_s &packet); - void _cleanup_thread(const uint64_t client_uuid); - - /** - * Like _cleanup_thread(), but does not take the mutex - */ - void __cleanup_thread(const uint64_t client_uuid); + void _server_main(); void _lock() { @@ -114,28 +103,19 @@ private: pthread_mutex_unlock(&_mutex); } - - static void _send_retval(const int pipe_fd, const int retval, const uint64_t client_uuid); - - struct RunCmdArgs { - char cmd[sizeof(client_send_packet_s::payload.execute_msg.cmd)]; - uint64_t client_uuid; - bool is_atty; - int pipe_fd; - }; - - static void *_run_cmd(void *arg); + static void *_handle_client(void *arg); + static void _cleanup(int fd); pthread_t _server_main_pthread; - std::map<pthread_t, int> _pthread_to_pipe_fd; - std::map<uint64_t, pthread_t> _client_uuid_to_pthread; - pthread_mutex_t _mutex; ///< protects access to _pthread_to_pipe_fd and _client_uuid_to_pthread + std::map<int, pthread_t> _fd_to_thread; + pthread_mutex_t _mutex; ///< Protects _fd_to_thread. pthread_key_t _key; int _instance_id; ///< instance ID for running multiple instances of the px4 server + int _fd; static void _pthread_key_destructor(void *arg); diff --git a/platforms/posix/src/px4_daemon/server_io.cpp b/platforms/posix/src/px4_daemon/server_io.cpp index 72564b5c88..ba801053da 100644 --- a/platforms/posix/src/px4_daemon/server_io.cpp +++ b/platforms/posix/src/px4_daemon/server_io.cpp @@ -50,7 +50,7 @@ #include "server.h" #include <px4_daemon/server_io.h> -#include "pipe_protocol.h" +#include "sock_protocol.h" using namespace px4_daemon; @@ -84,10 +84,8 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty) #endif - client_recv_packet_s *packet = &thread_data_ptr->packet; - - *buffer = (char *)packet->payload.stdout_msg.text; - *max_length = sizeof(packet->payload.stdout_msg.text); + *buffer = thread_data_ptr->buffer; + *max_length = sizeof(thread_data_ptr->buffer); *is_atty = thread_data_ptr->is_atty; return 0; @@ -95,8 +93,6 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty) int send_stdout_pipe_buffer(unsigned buffer_length) { - assert(buffer_length <= sizeof(client_recv_packet_s::payload.stdout_msg.text)); - Server::CmdThreadSpecificData *thread_data_ptr; if (!Server::is_running()) { @@ -108,25 +104,9 @@ int send_stdout_pipe_buffer(unsigned buffer_length) return -1; } - client_recv_packet_s *packet = &thread_data_ptr->packet; - packet->header.payload_length = buffer_length; - - int pipe_fd = thread_data_ptr->pipe_fd; - int bytes_to_send = get_client_recv_packet_length(packet); - - // Check if we can write first by writing 0 bytes. - // If we don't do this, we'll get SIGPIPE and be very unhappy - // because the whole process will go down. - int ret = write(pipe_fd, nullptr, 0); - - if (ret == 0 && errno == EPIPE) { - printf("Error: can't write to closed pipe, giving up.\n"); - pthread_exit(nullptr); - } - - int bytes_sent = write(pipe_fd, packet, bytes_to_send); + int bytes_sent = write(thread_data_ptr->fd, thread_data_ptr->buffer, buffer_length); - if (bytes_sent != bytes_to_send) { + if (bytes_sent != (int)buffer_length) { printf("write fail\n"); return -1; } diff --git a/platforms/posix/src/px4_daemon/pipe_protocol.cpp b/platforms/posix/src/px4_daemon/sock_protocol.cpp similarity index 62% rename from platforms/posix/src/px4_daemon/pipe_protocol.cpp rename to platforms/posix/src/px4_daemon/sock_protocol.cpp index 57bc95ebfb..65b9e18000 100644 --- a/platforms/posix/src/px4_daemon/pipe_protocol.cpp +++ b/platforms/posix/src/px4_daemon/sock_protocol.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -31,44 +31,20 @@ * ****************************************************************************/ /** - * @file pipe_protocol.cpp + * @file sock_protocol.cpp * - * @author Julian Oes <julian@oes.ch> - * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ -#include <stdint.h> -#include <stdio.h> -#include <inttypes.h> - -#include "pipe_protocol.h" - -static const char CLIENT_SEND_PIPE_PATH[] = "/tmp/px4_client_send_pipe-"; -static const char CLIENT_RECV_PIPE_PATH[] = "/tmp/px4_client_recv_pipe"; - +#include "sock_protocol.h" namespace px4_daemon { - -unsigned get_client_send_packet_length(const client_send_packet_s *packet) -{ - return sizeof(client_send_packet_s) - sizeof(packet->payload) + packet->header.payload_length; -} - -unsigned get_client_recv_packet_length(const client_recv_packet_s *packet) -{ - return sizeof(client_recv_packet_s) - sizeof(packet->payload) + packet->header.payload_length; -} - -int get_client_recv_pipe_path(const uint64_t uuid, char *path, const size_t path_len) -{ - return snprintf(path, path_len, "%s-%016" PRIx64, CLIENT_RECV_PIPE_PATH, uuid); -} - -std::string get_client_send_pipe_path(int instance_id) +std::string get_socket_path(int instance_id) { - return std::string(CLIENT_SEND_PIPE_PATH) + std::to_string(instance_id); + // TODO: Use /var/run/px4/$instance/sock (or /var/run/user/$UID/... for non-root). + return "/tmp/px4-sock-" + std::to_string(instance_id); } } // namespace px4_daemon diff --git a/platforms/posix/src/px4_daemon/pipe_protocol.h b/platforms/posix/src/px4_daemon/sock_protocol.h similarity index 52% rename from platforms/posix/src/px4_daemon/pipe_protocol.h rename to platforms/posix/src/px4_daemon/sock_protocol.h index 0f8012cdfd..f158a29b66 100644 --- a/platforms/posix/src/px4_daemon/pipe_protocol.h +++ b/platforms/posix/src/px4_daemon/sock_protocol.h @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (C) 2016 PX4 Development Team. All rights reserved. + * Copyright (C) 2018 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -31,69 +31,18 @@ * ****************************************************************************/ /** - * @file pipe_protocol.h + * @file sock_protocol.h * - * @author Julian Oes <julian@oes.ch> - * @author Beat Küng <beat-kueng@gmx.net> + * @author Mara Bos <m-ou.se@m-ou.se> */ #pragma once -#include <stdint.h> #include <string> namespace px4_daemon { -static const unsigned RECV_PIPE_PATH_LEN = 64; - -// The packet size is no more than 512 bytes, because that is the minimum guaranteed size -// for a pipe to avoid interleaving of messages when multiple clients write at the same time -// (atomic writes). -struct client_send_packet_s { - struct message_header_s { - uint64_t client_uuid; - enum class e_msg_id : int { - EXECUTE, - KILL - } msg_id; - unsigned payload_length; - } header; - - union { - struct execute_msg_s { - uint8_t is_atty; - uint8_t cmd[512 - sizeof(message_header_s) - sizeof(uint8_t)]; - } execute_msg; - struct kill_msg_s { - int cmd_id; - } kill_msg; - } payload; -}; - -// We have per client receiver a pipe with the uuid in its file path. -struct client_recv_packet_s { - struct message_header_s { - enum class e_msg_id : int { - RETVAL, - STDOUT - } msg_id; - unsigned payload_length; - } header; - - union { - struct retval_msg_s { - int retval; - } retval_msg; - struct stdout_msg_s { - uint8_t text[512 - sizeof(message_header_s)]; ///< null-terminated string (payload_length includes the null) - } stdout_msg; - } payload; -}; - -unsigned get_client_send_packet_length(const client_send_packet_s *packet); -unsigned get_client_recv_packet_length(const client_recv_packet_s *packet); -int get_client_recv_pipe_path(const uint64_t uuid, char *path, const size_t path_len); -std::string get_client_send_pipe_path(int instance_id); +std::string get_socket_path(int instance_id); } // namespace px4_daemon