Browse Source

Use unix sockets instead of pipes for posix daemon. (#10766)

Unlike pipes, unix sockets provide bi-directional
communication with each connected client.

- No need to generate a unique uuid per client anymore.

- The client doesn't have to create its own pipe anymore.

- Since there is no risk of multiple client's writes getting mixed up,
  messages don't need to fit in a single write anymore, removing the
  limit on command length.

- Since the server can detect a connection closing, the client no longer
  needs to handle signals. When the client is killed, the connection is
  automatically closed, which will cause the server to kill the related
  px4 thread.
  Since this does not rely on handling signals and the client sending an
  additional message, this is much more reliable.

- Client is no longer a singleton.

- The protocol is simplified. Standard output is directly written to the
  socket back to the client, without wrapping it in any protocol
  message.

- Because of the simple protocol, one could now even use netcat to run a
  px4 command:

    $ echo hello | netcat -UN /tmp/px4-sock-0

Also removes a few race conditions.
sbg
Mara Bos 6 years ago committed by Daniel Agar
parent
commit
9594ebf72e
  1. 2
      platforms/posix/src/main.cpp
  2. 2
      platforms/posix/src/px4_daemon/CMakeLists.txt
  3. 314
      platforms/posix/src/px4_daemon/client.cpp
  4. 46
      platforms/posix/src/px4_daemon/client.h
  5. 337
      platforms/posix/src/px4_daemon/server.cpp
  6. 48
      platforms/posix/src/px4_daemon/server.h
  7. 30
      platforms/posix/src/px4_daemon/server_io.cpp
  8. 38
      platforms/posix/src/px4_daemon/sock_protocol.cpp
  9. 59
      platforms/posix/src/px4_daemon/sock_protocol.h

2
platforms/posix/src/main.cpp

@ -173,8 +173,6 @@ int main(int argc, char **argv) @@ -173,8 +173,6 @@ int main(int argc, char **argv)
argv[0] += path_length + strlen(prefix);
px4_daemon::Client client(instance);
client.generate_uuid();
client.register_sig_handler();
return client.process_args(argc, (const char **)argv);
} else {

2
platforms/posix/src/px4_daemon/CMakeLists.txt

@ -37,6 +37,6 @@ px4_add_library(px4_daemon @@ -37,6 +37,6 @@ px4_add_library(px4_daemon
client.cpp
server.cpp
server_io.cpp
pipe_protocol.cpp
sock_protocol.cpp
)

314
platforms/posix/src/px4_daemon/client.cpp

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -35,13 +35,15 @@ @@ -35,13 +35,15 @@
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
#include <signal.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
@ -53,60 +55,33 @@ @@ -53,60 +55,33 @@
namespace px4_daemon
{
namespace client
{
static Client *_instance;
}
Client::Client(int instance_id) :
_uuid(0),
_client_send_pipe_fd(-1),
_fd(-1),
_instance_id(instance_id)
{
client::_instance = this;
}
Client::~Client()
{
client::_instance = nullptr;
}
{}
int
Client::generate_uuid()
Client::process_args(const int argc, const char **argv)
{
int rand_fd = open("/dev/urandom", O_RDONLY);
if (rand_fd < 0) {
PX4_ERR("open urandom");
return rand_fd;
}
int ret = 0;
std::string sock_path = get_socket_path(_instance_id);
int rand_read = read(rand_fd, &_uuid, sizeof(_uuid));
_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (rand_read != sizeof(_uuid)) {
PX4_ERR("rand read fail");
ret = -errno;
if (_fd < 0) {
PX4_ERR("error creating socket");
return -1;
}
close(rand_fd);
return ret;
}
int
Client::process_args(const int argc, const char **argv)
{
// Prepare return pipe first to avoid a race.
int ret = _prepare_recv_pipe();
sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path) - 1);
if (ret != 0) {
PX4_ERR("Could not prepare recv pipe");
return -2;
if (connect(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) {
PX4_ERR("error connecting to socket");
return -1;
}
ret = _send_cmds(argc, argv);
int ret = _send_cmds(argc, argv);
if (ret != 0) {
PX4_ERR("Could not send commands");
@ -116,70 +91,43 @@ Client::process_args(const int argc, const char **argv) @@ -116,70 +91,43 @@ Client::process_args(const int argc, const char **argv)
return _listen();
}
int
Client::_prepare_recv_pipe()
{
int ret = get_client_recv_pipe_path(_uuid, _recv_pipe_path, sizeof(_recv_pipe_path));
if (ret < 0) {
PX4_ERR("failed to assemble path");
return ret;
}
ret = mkfifo(_recv_pipe_path, 0666);
if (ret < 0) {
PX4_ERR("pipe %s already exists, errno: %d, %s", _recv_pipe_path, errno, strerror(errno));
return ret;
}
return 0;
}
int
Client::_send_cmds(const int argc, const char **argv)
{
// Send the command to server.
client_send_packet_s packet;
packet.header.msg_id = client_send_packet_s::message_header_s::e_msg_id::EXECUTE;
packet.header.client_uuid = _uuid;
packet.payload.execute_msg.is_atty = isatty(STDOUT_FILENO);
// Concat arguments to send them.
std::string cmd_buf;
for (int i = 0; i < argc; ++i) {
cmd_buf += argv[i];
if (i + 1 != argc) {
// TODO: Use '\0' as argument separator (and parse this server-side as well),
// so (quoted) whitespace within arguments doesn't get lost.
cmd_buf += " ";
}
}
if (cmd_buf.size() >= sizeof(packet.payload.execute_msg.cmd)) {
PX4_ERR("commmand too long");
return -1;
}
// Last byte is 'isatty'.
cmd_buf.push_back(isatty(STDOUT_FILENO));
strcpy((char *)packet.payload.execute_msg.cmd, cmd_buf.c_str());
size_t n = cmd_buf.size();
const char *buf = cmd_buf.data();
// The size is +1 because we want to include the null termination.
packet.header.payload_length = cmd_buf.size() + 1 + sizeof(packet.payload.execute_msg.is_atty);
while (n > 0) {
int n_sent = write(_fd, buf, n);
_client_send_pipe_fd = open(get_client_send_pipe_path(_instance_id).c_str(), O_WRONLY);
if (n_sent < 0) {
PX4_ERR("write() failed: %s", strerror(errno));
return -1;
}
if (_client_send_pipe_fd < 0) {
PX4_ERR("pipe open fail (%i)", errno);
return _client_send_pipe_fd;
n -= n_sent;
buf += n_sent;
}
int bytes_to_send = get_client_send_packet_length(&packet);
int bytes_sent = write(_client_send_pipe_fd, &packet, bytes_to_send);
if (bytes_sent != bytes_to_send) {
PX4_ERR("write fail (%i)", errno);
return bytes_sent;
// Let the server know we're done writing.
if (shutdown(_fd, SHUT_WR) < 0) {
PX4_ERR("shutdown() failed: %s", strerror(errno));
return -1;
}
return 0;
@ -188,159 +136,57 @@ Client::_send_cmds(const int argc, const char **argv) @@ -188,159 +136,57 @@ Client::_send_cmds(const int argc, const char **argv)
int
Client::_listen()
{
int client_recv_pipe_fd = open(_recv_pipe_path, O_RDONLY);
if (client_recv_pipe_fd < 0) {
PX4_ERR("open failed, errno: %d, %s", errno, strerror(errno));
}
bool exit_loop = false;
int exit_arg = 0;
while (!exit_loop) {
// We only read as much as we need, otherwise we might get out of
// sync with packets.
client_recv_packet_s packet_recv;
int bytes_read = read(client_recv_pipe_fd, &packet_recv, sizeof(client_recv_packet_s::header));
if (bytes_read > 0) {
// Using the header we can determine how big the payload is.
int payload_to_read = sizeof(packet_recv)
- sizeof(packet_recv.header)
- sizeof(packet_recv.payload)
+ packet_recv.header.payload_length;
// Again, we only read as much as we need because otherwise we need
// hold a buffer and parse it.
bytes_read = read(client_recv_pipe_fd, ((uint8_t *)&packet_recv) + bytes_read, payload_to_read);
if (bytes_read > 0) {
int retval = 0;
bool should_exit = false;
int parse_ret = _parse_client_recv_packet(packet_recv, retval, should_exit);
if (parse_ret != 0) {
PX4_ERR("retval could not be parsed");
exit_arg = -1;
} else {
exit_arg = retval;
}
exit_loop = should_exit;
} else if (bytes_read == 0) {
exit_arg = 0;
exit_loop = true;
char buffer[1024];
int n_buffer_used = 0;
// The response ends in {0, retval}. So when we detect a 0, or a 0 followed
// by another byte, we don't output it yet, until we know whether it was
// the end of the stream or not.
while (true) {
int n_read = read(_fd, buffer + n_buffer_used, sizeof buffer - n_buffer_used);
if (n_read < 0) {
PX4_ERR("unable to read from socket");
return -1;
} else if (n_read == 0) {
if (n_buffer_used == 2) {
return buffer[1];
} else {
// Missing return value at end of stream. Stream was abruptly ended.
return -1;
}
} else if (bytes_read == 0) {
// 0 means the pipe has been closed by all clients.
exit_arg = 0;
exit_loop = true;
} else {
n_read += n_buffer_used;
if (n_read >= 2 && buffer[n_read - 2] == 0) {
// If the buffer ends in {0, retval}, keep it.
fwrite(buffer, n_read - 2, 1, stdout);
buffer[0] = 0;
buffer[1] = buffer[n_read - 1];
n_buffer_used = 2;
} else if (n_read >= 1 && buffer[n_read - 1] == 0) {
// If the buffer ends in a 0-byte, keep it.
fwrite(buffer, n_read - 1, 1, stdout);
buffer[0] = 0;
n_buffer_used = 1;
} else {
fwrite(buffer, n_read, 1, stdout);
n_buffer_used = 0;
}
}
}
close(_client_send_pipe_fd);
return exit_arg;
}
int
Client::_parse_client_recv_packet(const client_recv_packet_s &packet, int &retval, bool &should_exit)
{
switch (packet.header.msg_id) {
case client_recv_packet_s::message_header_s::e_msg_id::RETVAL:
should_exit = true;
return _retval_cmd_packet(packet, retval);
case client_recv_packet_s::message_header_s::e_msg_id::STDOUT:
should_exit = false;
return _stdout_msg_packet(packet);
default:
should_exit = true;
PX4_ERR("recv msg_id not handled: %d", (int)packet.header.msg_id);
return -1;
}
}
int
Client::_retval_cmd_packet(const client_recv_packet_s &packet, int &retval)
{
if (packet.header.payload_length == sizeof(packet.payload.retval_msg.retval)) {
retval = packet.payload.retval_msg.retval;
return 0;
} else {
PX4_ERR("payload size wrong");
return -1;
}
}
int
Client::_stdout_msg_packet(const client_recv_packet_s &packet)
{
if (packet.header.payload_length <= sizeof(packet.payload.stdout_msg.text)) {
printf("%s", packet.payload.stdout_msg.text);
return 0;
} else {
PX4_ERR("payload size wrong (%i > %zu)", packet.header.payload_length, sizeof(packet.payload.stdout_msg.text));
return -1;
}
}
void
Client::register_sig_handler()
{
// Register handlers for Ctrl+C to kill the thread if something hangs.
struct sigaction sig_int {};
sig_int.sa_handler = Client::_static_sig_handler;
// Without the flag SA_RESTART, we can't use open() after Ctrl+C has
// been pressed, and we can't wait for the return value from the
// cancelled command.
sig_int.sa_flags = SA_RESTART;
sigaction(SIGINT, &sig_int, nullptr);
sigaction(SIGTERM, &sig_int, nullptr);
}
void
Client::_static_sig_handler(int sig_num)
{
client::_instance->_sig_handler(sig_num);
}
void
Client::_sig_handler(int sig_num)
Client::~Client()
{
client_send_packet_s packet;
packet.header.msg_id = client_send_packet_s::message_header_s::e_msg_id::KILL;
packet.header.client_uuid = _uuid;
packet.payload.kill_msg.cmd_id = sig_num;
packet.header.payload_length = sizeof(packet.payload.kill_msg.cmd_id);
if (_client_send_pipe_fd < 0) {
PX4_ERR("pipe open fail");
system_exit(-1);
}
int bytes_to_send = get_client_send_packet_length(&packet);
int bytes_sent = write(_client_send_pipe_fd, &packet, bytes_to_send);
if (bytes_sent != bytes_to_send) {
PX4_ERR("write fail");
system_exit(-1);
if (_fd >= 0) {
close(_fd);
}
}
} // namespace px4_daemon

46
platforms/posix/src/px4_daemon/client.h

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -33,21 +33,22 @@ @@ -33,21 +33,22 @@
/**
* @file client.h
*
* The client can write a command to the pipe that is supplied by the server.
* It will then open another pipe with its own UUID encoded and listen for
* stdout of the process that it started and the return value.
* The client can connect and write a command to the socket that is supplied by
* the server. It will then close its half of the connection, and read back the
* stdout stream of the process that it started, followed by its return value.
*
* It the client receives a signal (e.g. Ctrl+C) it will catch it and send it
* as a message to the server in order to terminate the thread.
* It the client dies, the connection gets closed automatically and the corresponding
* thread in the server gets cancelled.
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#pragma once
#include <stdint.h>
#include "pipe_protocol.h"
#include "sock_protocol.h"
namespace px4_daemon
{
@ -57,19 +58,14 @@ class Client @@ -57,19 +58,14 @@ class Client
{
public:
Client(int instance_id = 0);
~Client();
/**
* Initialize the unique ID of the client.
*
* @return 0 on success.
*/
int generate_uuid();
/**
* Make sure to catch signals in order to forward them to the server.
*/
void register_sig_handler();
Client(Client &&other) : _fd(other._fd), _instance_id(other._instance_id)
{
// Steal the fd from the moved-from client.
other._fd = -1;
}
/**
* Process the supplied command line arguments and send them to server.
@ -81,22 +77,10 @@ public: @@ -81,22 +77,10 @@ public:
int process_args(const int argc, const char **argv);
private:
int _prepare_recv_pipe();
int _send_cmds(const int argc, const char **argv);
int _listen();
int _parse_client_recv_packet(const client_recv_packet_s &packet, int &retval, bool &should_exit);
int _retval_cmd_packet(const client_recv_packet_s &packet, int &retval);
int _stdout_msg_packet(const client_recv_packet_s &packet);
static void _static_sig_handler(int sig_num);
void _sig_handler(int sig_num);
uint64_t _uuid;
int _client_send_pipe_fd;
char _recv_pipe_path[RECV_PIPE_PATH_LEN];
int _fd;
int _instance_id; ///< instance ID for running multiple instances of the px4 server
};

337
platforms/posix/src/px4_daemon/server.cpp

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -35,29 +35,27 @@ @@ -35,29 +35,27 @@
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <string>
#include <pthread.h>
#include <poll.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <vector>
#include <px4_log.h>
#include "pxh.h"
#include "server.h"
// In Cygwin opening and closing the same named pipe multiple times within one process doesn't work POSIX compliant.
// As a workaround we open the client send pipe file in read write mode such that we can keep it open all the time.
#if !defined(__PX4_CYGWIN)
#define CLIENT_SEND_PIPE_OFLAGS O_RDONLY
#else
#define CLIENT_SEND_PIPE_OFLAGS O_RDWR
#endif
namespace px4_daemon
{
@ -78,22 +76,37 @@ Server::~Server() @@ -78,22 +76,37 @@ Server::~Server()
int
Server::start()
{
std::string client_send_pipe_path = get_client_send_pipe_path(_instance_id);
std::string sock_path = get_socket_path(_instance_id);
// Delete socket in case it exists already.
unlink(sock_path.c_str());
_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (_fd < 0) {
PX4_ERR("error creating socket");
return -1;
}
// Delete pipe in case it exists already.
unlink(client_send_pipe_path.c_str());
sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path) - 1);
// Create new pipe to listen to clients.
// This needs to happen before we return from this method, so that the caller can launch clients.
mkfifo(client_send_pipe_path.c_str(), 0666);
if (bind(_fd, (sockaddr *)&addr, sizeof(addr)) < 0) {
PX4_ERR("error binding socket");
return -1;
}
if (listen(_fd, 10) < 0) {
PX4_ERR("error listing to socket");
return -1;
}
if (0 != pthread_create(&_server_main_pthread,
nullptr,
_server_main_trampoline,
nullptr)) {
this)) {
PX4_ERR("error creating client handler thread");
return -1;
}
@ -101,12 +114,9 @@ Server::start() @@ -101,12 +114,9 @@ Server::start()
}
void *
Server::_server_main_trampoline(void *arg)
Server::_server_main_trampoline(void *self)
{
if (_instance) {
_instance->_server_main(arg);
}
((Server *)self)->_server_main();
return nullptr;
}
@ -116,9 +126,8 @@ void Server::_pthread_key_destructor(void *arg) @@ -116,9 +126,8 @@ void Server::_pthread_key_destructor(void *arg)
}
void
Server::_server_main(void *arg)
Server::_server_main()
{
// Set thread specific pipe to supplied file descriptor.
int ret = pthread_key_create(&_key, _pthread_key_destructor);
if (ret != 0) {
@ -126,241 +135,151 @@ Server::_server_main(void *arg) @@ -126,241 +135,151 @@ Server::_server_main(void *arg)
return;
}
std::string client_send_pipe_path = get_client_send_pipe_path(_instance_id);
int client_send_pipe_fd = open(client_send_pipe_path.c_str(), CLIENT_SEND_PIPE_OFLAGS);
// The list of file descriptors to watch.
std::vector<pollfd> poll_fds;
while (true) {
// Watch the listening socket for incoming connections.
poll_fds.push_back(pollfd {_fd, POLLIN, 0});
client_send_packet_s packet;
while (true) {
int n_ready = poll(poll_fds.data(), poll_fds.size(), -1);
// We only read as much as we need, otherwise we might get out of
// sync with packets.
int bytes_read = read(client_send_pipe_fd, &packet, sizeof(client_send_packet_s::header));
if (n_ready < 0) {
PX4_ERR("poll() failed: %s", strerror(errno));
return;
}
if (bytes_read > 0) {
_lock();
// Using the header we can determine how big the payload is.
int payload_to_read = sizeof(packet)
- sizeof(packet.header)
- sizeof(packet.payload)
+ packet.header.payload_length;
// Handle any new connections.
if (poll_fds[0].revents & POLLIN) {
--n_ready;
int client = accept(_fd, nullptr, nullptr);
// Again, we only read as much as we need because otherwise we need
// hold a buffer and parse it.
bytes_read = read(client_send_pipe_fd, ((uint8_t *)&packet) + bytes_read, payload_to_read);
if (client == -1) {
PX4_ERR("failed to accept client: %s", strerror(errno));
_unlock();
return;
}
if (bytes_read > 0) {
// Start a new thread to handle the client.
pthread_t *thread = &_fd_to_thread[client];
ret = pthread_create(thread, nullptr, Server::_handle_client, (void *)(intptr_t)client);
_parse_client_send_packet(packet);
if (ret != 0) {
PX4_ERR("could not start pthread (%i)", ret);
_fd_to_thread.erase(client);
close(client);
} else {
// We won't join the thread, so detach to automatically release resources at its end
pthread_detach(*thread);
}
}
if (bytes_read == 0) {
// 0 means the pipe has been closed by all clients
// and we need to re-open it.
close(client_send_pipe_fd);
client_send_pipe_fd = open(client_send_pipe_path.c_str(), O_RDONLY);
// Start listening for the client hanging up.
poll_fds.push_back(pollfd {client, POLLHUP, 0});
}
}
close(client_send_pipe_fd);
}
// Handle any closed connections.
for (size_t i = 1; n_ready > 0 && i < poll_fds.size();) {
if (poll_fds[i].revents) {
--n_ready;
auto thread = _fd_to_thread.find(poll_fds[i].fd);
void
Server::_parse_client_send_packet(const client_send_packet_s &packet)
{
switch (packet.header.msg_id) {
case client_send_packet_s::message_header_s::e_msg_id::EXECUTE:
_execute_cmd_packet(packet);
break;
case client_send_packet_s::message_header_s::e_msg_id::KILL:
_kill_cmd_packet(packet);
break;
default:
PX4_ERR("send msg_id not handled");
break;
}
}
if (thread != _fd_to_thread.end()) {
// Thread is still running, so we cancel it.
// TODO: use a more graceful exit method to avoid resource leaks
pthread_cancel(thread->second);
_fd_to_thread.erase(thread);
}
void
Server::_execute_cmd_packet(const client_send_packet_s &packet)
{
if (packet.header.payload_length == 0) {
PX4_ERR("command length 0");
return;
}
close(poll_fds[i].fd);
poll_fds.erase(poll_fds.begin() + i);
// We open the client's specific pipe to write the return value and stdout back to.
// The pipe's path is created knowing the UUID of the client.
char path[RECV_PIPE_PATH_LEN];
int ret = get_client_recv_pipe_path(packet.header.client_uuid, path, RECV_PIPE_PATH_LEN);
if (ret < 0) {
PX4_ERR("failed to assemble path");
return;
}
int pipe_fd = open(path, O_WRONLY);
if (pipe_fd < 0) {
PX4_ERR("pipe open fail");
return;
}
// To execute a command we start a new thread.
pthread_t new_pthread;
// We need to copy everything that the new thread needs because we will go
// out of scope.
RunCmdArgs *args = new RunCmdArgs;
strncpy(args->cmd, (char *)packet.payload.execute_msg.cmd, sizeof(args->cmd));
args->client_uuid = packet.header.client_uuid;
args->pipe_fd = pipe_fd;
args->is_atty = packet.payload.execute_msg.is_atty;
_lock(); // need to lock, otherwise the thread could already exit before we insert into the map
ret = pthread_create(&new_pthread, nullptr, Server::_run_cmd, (void *)args);
} else {
++i;
}
}
if (ret != 0) {
PX4_ERR("could not start pthread (%i)", ret);
delete args;
} else {
// We won't join the thread, so detach to automatically release resources at its end
pthread_detach(new_pthread);
// We keep two maps for cleanup if a thread is finished or killed.
_client_uuid_to_pthread.insert(std::pair<uint64_t, pthread_t>
(packet.header.client_uuid, new_pthread));
_pthread_to_pipe_fd.insert(std::pair<pthread_t, int>
(new_pthread, pipe_fd));
_unlock();
}
_unlock();
close(_fd);
}
void
Server::_kill_cmd_packet(const client_send_packet_s &packet)
*Server::_handle_client(void *arg)
{
_lock();
int fd = (int)(intptr_t)arg;
// TODO: we currently ignore the signal type.
auto client_uuid_iter = _client_uuid_to_pthread.find(packet.header.client_uuid);
// Read until the end of the incoming stream.
std::string cmd;
if (client_uuid_iter == _client_uuid_to_pthread.end()) {
_unlock();
return;
}
pthread_t pthread_to_kill = client_uuid_iter->second;
// TODO: use a more graceful exit method to avoid resource leaks
int ret = pthread_cancel(pthread_to_kill);
while (true) {
size_t n = cmd.size();
cmd.resize(n + 1024);
ssize_t n_read = read(fd, &cmd[n], cmd.size() - n);
__cleanup_thread(packet.header.client_uuid);
if (n_read < 0) {
_cleanup(fd);
return nullptr;
}
_unlock();
cmd.resize(n + n_read);
if (ret != 0) {
PX4_ERR("failed to cancel thread");
if (n_read == 0) {
break;
}
}
// We don't send retval when we get killed.
// The client knows this and just exits without confirmation.
}
void
*Server::_run_cmd(void *arg)
{
RunCmdArgs *args = (RunCmdArgs *)arg;
// Copy arguments so that we can cleanup the arg structure.
uint64_t client_uuid = args->client_uuid;
int pipe_fd = args->pipe_fd;
bool is_atty = args->is_atty;
std::string message_str(args->cmd);
if (cmd.size() < 2) {
_cleanup(fd);
return nullptr;
}
// Clean up the args from the heap in case the thread gets canceled
// from outside.
delete args;
// Last byte is 'isatty'.
uint8_t isatty = cmd.back();
cmd.pop_back();
// We register thread specific data. This is used for PX4_INFO (etc.) log calls.
CmdThreadSpecificData *thread_data_ptr;
if ((thread_data_ptr = (CmdThreadSpecificData *)pthread_getspecific(_instance->_key)) == nullptr) {
thread_data_ptr = new CmdThreadSpecificData;
thread_data_ptr->pipe_fd = pipe_fd;
thread_data_ptr->is_atty = is_atty;
thread_data_ptr->packet.header.msg_id = client_recv_packet_s::message_header_s::e_msg_id::STDOUT;
thread_data_ptr->fd = fd;
thread_data_ptr->is_atty = isatty;
(void)pthread_setspecific(_instance->_key, (void *)thread_data_ptr);
}
// Run the actual command.
int retval = Pxh::process_line(message_str, true);
int retval = Pxh::process_line(cmd, true);
// Report return value.
_send_retval(pipe_fd, retval, client_uuid);
// Clean up before returning.
_instance->_cleanup_thread(client_uuid);
char buf[2] = {0, (char)retval};
return nullptr;
}
void
Server::_send_retval(const int pipe_fd, const int retval, const uint64_t client_uuid)
{
client_recv_packet_s packet;
packet.header.msg_id = client_recv_packet_s::message_header_s::e_msg_id::RETVAL;
packet.header.payload_length = sizeof(packet.payload.retval_msg);
packet.payload.retval_msg.retval = retval;
int bytes_to_send = get_client_recv_packet_length(&packet);
int bytes_sent = write(pipe_fd, &packet, bytes_to_send);
if (bytes_sent != bytes_to_send) {
printf("write fail\n");
return;
if (write(fd, buf, sizeof buf) < 0) {
// Don't care it went wrong, as we're cleaning up anyway.
}
}
void
Server::_cleanup_thread(const uint64_t client_uuid)
{
_lock();
__cleanup_thread(client_uuid);
_unlock();
_cleanup(fd);
return nullptr;
}
void
Server::__cleanup_thread(const uint64_t client_uuid)
Server::_cleanup(int fd)
{
pthread_t pthread_killed = _client_uuid_to_pthread[client_uuid];
auto pipe_iter = _pthread_to_pipe_fd.find(pthread_killed);
if (pipe_iter == _pthread_to_pipe_fd.end()) {
// can happen if the thread already exited and then got a kill packet
PX4_DEBUG("pipe fd already closed");
return;
}
int pipe_fd = pipe_iter->second;
close(pipe_fd);
char path[RECV_PIPE_PATH_LEN] = {};
get_client_recv_pipe_path(client_uuid, path, RECV_PIPE_PATH_LEN);
unlink(path);
_client_uuid_to_pthread.erase(client_uuid);
_pthread_to_pipe_fd.erase(pthread_killed);
_instance->_lock();
_instance->_fd_to_thread.erase(fd);
_instance->_unlock();
// We can't close() the fd here, since the main thread is probably
// polling for it: close()ing it causes a race condition.
// So, we only call shutdown(), which causes the main thread to register a
// 'POLLHUP', such that the main thread can close() it for us.
// We already removed this thread from _fd_to_thread, so there is no risk
// of the main thread trying to cancel this thread after it already exited.
shutdown(fd, SHUT_RDWR);
}
} //namespace px4_daemon

48
platforms/posix/src/px4_daemon/server.h

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2016, 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -33,12 +33,11 @@ @@ -33,12 +33,11 @@
/**
* @file server.h
*
* The server (also called daemon) opens a pipe for clients to write to.
* The server (also called daemon) opens a socket for clients to connect to.
*
* Once a client connects it will send a command as well as a unique ID.
* Once a client connects it will send a command and close its side of the connection.
* The server will return the stdout of the executing command, as well as the return
* value to the client on a client specific pipe.
* The client specific pipe are idenified by the unique ID of the client.
* value to the client.
*
* There should only every be one server running, therefore the static instance.
* The Singleton implementation is not complete, but it should be obvious not
@ -46,6 +45,7 @@ @@ -46,6 +45,7 @@
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#pragma once
@ -54,7 +54,7 @@ @@ -54,7 +54,7 @@
#include <pthread.h>
#include <map>
#include "pipe_protocol.h"
#include "sock_protocol.h"
namespace px4_daemon
@ -74,11 +74,10 @@ public: @@ -74,11 +74,10 @@ public:
*/
int start();
struct CmdThreadSpecificData {
int pipe_fd; // pipe fd to send data to descriptor
int fd; // fd to send stdout to
bool is_atty; // whether file descriptor refers to a terminal
client_recv_packet_s packet;
char buffer[1024];
};
static bool is_running()
@ -92,17 +91,7 @@ public: @@ -92,17 +91,7 @@ public:
}
private:
static void *_server_main_trampoline(void *arg);
void _server_main(void *arg);
void _parse_client_send_packet(const client_send_packet_s &packet);
void _execute_cmd_packet(const client_send_packet_s &packet);
void _kill_cmd_packet(const client_send_packet_s &packet);
void _cleanup_thread(const uint64_t client_uuid);
/**
* Like _cleanup_thread(), but does not take the mutex
*/
void __cleanup_thread(const uint64_t client_uuid);
void _server_main();
void _lock()
{
@ -114,28 +103,19 @@ private: @@ -114,28 +103,19 @@ private:
pthread_mutex_unlock(&_mutex);
}
static void _send_retval(const int pipe_fd, const int retval, const uint64_t client_uuid);
struct RunCmdArgs {
char cmd[sizeof(client_send_packet_s::payload.execute_msg.cmd)];
uint64_t client_uuid;
bool is_atty;
int pipe_fd;
};
static void *_run_cmd(void *arg);
static void *_handle_client(void *arg);
static void _cleanup(int fd);
pthread_t _server_main_pthread;
std::map<pthread_t, int> _pthread_to_pipe_fd;
std::map<uint64_t, pthread_t> _client_uuid_to_pthread;
pthread_mutex_t _mutex; ///< protects access to _pthread_to_pipe_fd and _client_uuid_to_pthread
std::map<int, pthread_t> _fd_to_thread;
pthread_mutex_t _mutex; ///< Protects _fd_to_thread.
pthread_key_t _key;
int _instance_id; ///< instance ID for running multiple instances of the px4 server
int _fd;
static void _pthread_key_destructor(void *arg);

30
platforms/posix/src/px4_daemon/server_io.cpp

@ -50,7 +50,7 @@ @@ -50,7 +50,7 @@
#include "server.h"
#include <px4_daemon/server_io.h>
#include "pipe_protocol.h"
#include "sock_protocol.h"
using namespace px4_daemon;
@ -84,10 +84,8 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty) @@ -84,10 +84,8 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty)
#endif
client_recv_packet_s *packet = &thread_data_ptr->packet;
*buffer = (char *)packet->payload.stdout_msg.text;
*max_length = sizeof(packet->payload.stdout_msg.text);
*buffer = thread_data_ptr->buffer;
*max_length = sizeof(thread_data_ptr->buffer);
*is_atty = thread_data_ptr->is_atty;
return 0;
@ -95,8 +93,6 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty) @@ -95,8 +93,6 @@ int get_stdout_pipe_buffer(char **buffer, unsigned *max_length, bool *is_atty)
int send_stdout_pipe_buffer(unsigned buffer_length)
{
assert(buffer_length <= sizeof(client_recv_packet_s::payload.stdout_msg.text));
Server::CmdThreadSpecificData *thread_data_ptr;
if (!Server::is_running()) {
@ -108,25 +104,9 @@ int send_stdout_pipe_buffer(unsigned buffer_length) @@ -108,25 +104,9 @@ int send_stdout_pipe_buffer(unsigned buffer_length)
return -1;
}
client_recv_packet_s *packet = &thread_data_ptr->packet;
packet->header.payload_length = buffer_length;
int pipe_fd = thread_data_ptr->pipe_fd;
int bytes_to_send = get_client_recv_packet_length(packet);
// Check if we can write first by writing 0 bytes.
// If we don't do this, we'll get SIGPIPE and be very unhappy
// because the whole process will go down.
int ret = write(pipe_fd, nullptr, 0);
if (ret == 0 && errno == EPIPE) {
printf("Error: can't write to closed pipe, giving up.\n");
pthread_exit(nullptr);
}
int bytes_sent = write(pipe_fd, packet, bytes_to_send);
int bytes_sent = write(thread_data_ptr->fd, thread_data_ptr->buffer, buffer_length);
if (bytes_sent != bytes_to_send) {
if (bytes_sent != (int)buffer_length) {
printf("write fail\n");
return -1;
}

38
platforms/posix/src/px4_daemon/pipe_protocol.cpp → platforms/posix/src/px4_daemon/sock_protocol.cpp

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -31,44 +31,20 @@ @@ -31,44 +31,20 @@
*
****************************************************************************/
/**
* @file pipe_protocol.cpp
* @file sock_protocol.cpp
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#include <stdint.h>
#include <stdio.h>
#include <inttypes.h>
#include "pipe_protocol.h"
static const char CLIENT_SEND_PIPE_PATH[] = "/tmp/px4_client_send_pipe-";
static const char CLIENT_RECV_PIPE_PATH[] = "/tmp/px4_client_recv_pipe";
#include "sock_protocol.h"
namespace px4_daemon
{
unsigned get_client_send_packet_length(const client_send_packet_s *packet)
{
return sizeof(client_send_packet_s) - sizeof(packet->payload) + packet->header.payload_length;
}
unsigned get_client_recv_packet_length(const client_recv_packet_s *packet)
{
return sizeof(client_recv_packet_s) - sizeof(packet->payload) + packet->header.payload_length;
}
int get_client_recv_pipe_path(const uint64_t uuid, char *path, const size_t path_len)
{
return snprintf(path, path_len, "%s-%016" PRIx64, CLIENT_RECV_PIPE_PATH, uuid);
}
std::string get_client_send_pipe_path(int instance_id)
std::string get_socket_path(int instance_id)
{
return std::string(CLIENT_SEND_PIPE_PATH) + std::to_string(instance_id);
// TODO: Use /var/run/px4/$instance/sock (or /var/run/user/$UID/... for non-root).
return "/tmp/px4-sock-" + std::to_string(instance_id);
}
} // namespace px4_daemon

59
platforms/posix/src/px4_daemon/pipe_protocol.h → platforms/posix/src/px4_daemon/sock_protocol.h

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
/****************************************************************************
*
* Copyright (C) 2016 PX4 Development Team. All rights reserved.
* Copyright (C) 2018 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -31,69 +31,18 @@ @@ -31,69 +31,18 @@
*
****************************************************************************/
/**
* @file pipe_protocol.h
* @file sock_protocol.h
*
* @author Julian Oes <julian@oes.ch>
* @author Beat Küng <beat-kueng@gmx.net>
* @author Mara Bos <m-ou.se@m-ou.se>
*/
#pragma once
#include <stdint.h>
#include <string>
namespace px4_daemon
{
static const unsigned RECV_PIPE_PATH_LEN = 64;
// The packet size is no more than 512 bytes, because that is the minimum guaranteed size
// for a pipe to avoid interleaving of messages when multiple clients write at the same time
// (atomic writes).
struct client_send_packet_s {
struct message_header_s {
uint64_t client_uuid;
enum class e_msg_id : int {
EXECUTE,
KILL
} msg_id;
unsigned payload_length;
} header;
union {
struct execute_msg_s {
uint8_t is_atty;
uint8_t cmd[512 - sizeof(message_header_s) - sizeof(uint8_t)];
} execute_msg;
struct kill_msg_s {
int cmd_id;
} kill_msg;
} payload;
};
// We have per client receiver a pipe with the uuid in its file path.
struct client_recv_packet_s {
struct message_header_s {
enum class e_msg_id : int {
RETVAL,
STDOUT
} msg_id;
unsigned payload_length;
} header;
union {
struct retval_msg_s {
int retval;
} retval_msg;
struct stdout_msg_s {
uint8_t text[512 - sizeof(message_header_s)]; ///< null-terminated string (payload_length includes the null)
} stdout_msg;
} payload;
};
unsigned get_client_send_packet_length(const client_send_packet_s *packet);
unsigned get_client_recv_packet_length(const client_recv_packet_s *packet);
int get_client_recv_pipe_path(const uint64_t uuid, char *path, const size_t path_len);
std::string get_client_send_pipe_path(int instance_id);
std::string get_socket_path(int instance_id);
} // namespace px4_daemon
Loading…
Cancel
Save