Browse Source

Fixed threading and transmission issues for FTP

sbg
Lorenz Meier 11 years ago
parent
commit
a103fef948
  1. 4
      src/modules/mavlink/mavlink_ftp.cpp
  2. 33
      src/modules/mavlink/mavlink_ftp.h
  3. 62
      src/modules/mavlink/mavlink_main.cpp
  4. 2
      src/modules/mavlink/mavlink_receiver.cpp

4
src/modules/mavlink/mavlink_ftp.cpp

@ -64,7 +64,7 @@ MavlinkFTP::MavlinkFTP()
} }
void void
MavlinkFTP::handle_message(mavlink_message_t *msg, mavlink_channel_t channel) MavlinkFTP::handle_message(Mavlink* mavlink, mavlink_message_t *msg)
{ {
// get a free request // get a free request
auto req = _dqFree(); auto req = _dqFree();
@ -73,7 +73,7 @@ MavlinkFTP::handle_message(mavlink_message_t *msg, mavlink_channel_t channel)
if (req != nullptr) { if (req != nullptr) {
// decode the request // decode the request
if (req->decode(msg, channel)) { if (req->decode(mavlink, msg)) {
// and queue it for the worker // and queue it for the worker
work_queue(LPWORK, &req->work, &MavlinkFTP::_workerTrampoline, req, 0); work_queue(LPWORK, &req->work, &MavlinkFTP::_workerTrampoline, req, 0);

33
src/modules/mavlink/mavlink_ftp.h

@ -64,8 +64,8 @@ public:
static MavlinkFTP *getServer(); static MavlinkFTP *getServer();
// static interface // static interface
void handle_message(mavlink_message_t *msg, void handle_message(Mavlink* mavlink,
mavlink_channel_t channel); mavlink_message_t *msg);
private: private:
@ -145,9 +145,9 @@ private:
work_s work; work_s work;
}; };
bool decode(mavlink_message_t *fromMessage, mavlink_channel_t fromChannel) { bool decode(Mavlink *mavlink, mavlink_message_t *fromMessage) {
if (fromMessage->msgid == MAVLINK_MSG_ID_ENCAPSULATED_DATA) { if (fromMessage->msgid == MAVLINK_MSG_ID_ENCAPSULATED_DATA) {
_channel = fromChannel; _mavlink = mavlink;
mavlink_msg_encapsulated_data_decode(fromMessage, &_message); mavlink_msg_encapsulated_data_decode(fromMessage, &_message);
return true; return true;
} }
@ -155,7 +155,26 @@ private:
} }
void reply() { void reply() {
mavlink_msg_encapsulated_data_send(_channel, sequence(), rawData());
// XXX the proper way would be an IOCTL / uORB call, rather than exploiting the
// flat memory architecture, as we're operating between threads here.
mavlink_message_t msg;
msg.checksum = 0;
unsigned len = mavlink_msg_encapsulated_data_pack_chan(_mavlink->get_system_id(), _mavlink->get_component_id(),
_mavlink->get_channel(), &msg, sequence(), rawData());
// unsigned len = mavlink_msg_system_time_pack_chan(_mavlink->get_system_id(), _mavlink->get_component_id(),
// _mavlink->get_channel(), &msg, 255, 255);
if (!_mavlink->message_buffer_write(&msg, len+2)) {
warnx("FTP TX ERR");
} else {
warnx("wrote: sys: %d, comp: %d, chan: %d, len: %d, checksum: %d",
_mavlink->get_system_id(),
_mavlink->get_component_id(),
_mavlink->get_channel(),
len,
msg.checksum);
}
} }
uint8_t *rawData() { return &_message.data[0]; } uint8_t *rawData() { return &_message.data[0]; }
@ -163,12 +182,12 @@ private:
uint8_t *requestData() { return &(header()->data[0]); } uint8_t *requestData() { return &(header()->data[0]); }
unsigned dataSize() { return header()->size + sizeof(RequestHeader); } unsigned dataSize() { return header()->size + sizeof(RequestHeader); }
uint16_t sequence() const { return _message.seqnr; } uint16_t sequence() const { return _message.seqnr; }
mavlink_channel_t &channel() { return _channel; } mavlink_channel_t channel() { return _mavlink->get_channel(); }
char *dataAsCString(); char *dataAsCString();
private: private:
mavlink_channel_t _channel; Mavlink *_mavlink;
mavlink_encapsulated_data_t _message; mavlink_encapsulated_data_t _message;
}; };

62
src/modules/mavlink/mavlink_main.cpp

@ -83,6 +83,10 @@
#include "mavlink_rate_limiter.h" #include "mavlink_rate_limiter.h"
#include "mavlink_commands.h" #include "mavlink_commands.h"
#ifndef MAVLINK_CRC_EXTRA
#error MAVLINK_CRC_EXTRA has to be defined on PX4 systems
#endif
/* oddly, ERROR is not defined for c++ */ /* oddly, ERROR is not defined for c++ */
#ifdef ERROR #ifdef ERROR
# undef ERROR # undef ERROR
@ -114,6 +118,7 @@ static uint64_t last_write_try_times[6] = {0};
void void
mavlink_send_uart_bytes(mavlink_channel_t channel, const uint8_t *ch, int length) mavlink_send_uart_bytes(mavlink_channel_t channel, const uint8_t *ch, int length)
{ {
Mavlink *instance; Mavlink *instance;
switch (channel) { switch (channel) {
@ -198,7 +203,7 @@ mavlink_send_uart_bytes(mavlink_channel_t channel, const uint8_t *ch, int length
ssize_t ret = write(uart, ch, desired); ssize_t ret = write(uart, ch, desired);
if (ret != desired) { if (ret != desired) {
warnx("TX FAIL"); // XXX overflow perf
} else { } else {
last_write_success_times[(unsigned)channel] = last_write_try_times[(unsigned)channel]; last_write_success_times[(unsigned)channel] = last_write_try_times[(unsigned)channel];
} }
@ -230,6 +235,7 @@ Mavlink::Mavlink() :
_verbose(false), _verbose(false),
_forwarding_on(false), _forwarding_on(false),
_passing_on(false), _passing_on(false),
_ftp_on(false),
_uart_fd(-1), _uart_fd(-1),
_mavlink_param_queue_index(0), _mavlink_param_queue_index(0),
_subscribe_to_stream(nullptr), _subscribe_to_stream(nullptr),
@ -453,7 +459,7 @@ Mavlink::get_instance_id()
return _instance_id; return _instance_id;
} }
mavlink_channel_t const mavlink_channel_t
Mavlink::get_channel() Mavlink::get_channel()
{ {
return _channel; return _channel;
@ -536,6 +542,16 @@ void Mavlink::mavlink_update_system(void)
_use_hil_gps = (bool)use_hil_gps; _use_hil_gps = (bool)use_hil_gps;
} }
int Mavlink::get_system_id()
{
return mavlink_system.sysid;
}
int Mavlink::get_component_id()
{
return mavlink_system.compid;
}
int Mavlink::mavlink_open_uart(int baud, const char *uart_name, struct termios *uart_config_original, bool *is_usb) int Mavlink::mavlink_open_uart(int baud, const char *uart_name, struct termios *uart_config_original, bool *is_usb)
{ {
/* process baud rate */ /* process baud rate */
@ -1649,11 +1665,21 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate)
int int
Mavlink::message_buffer_init(int size) Mavlink::message_buffer_init(int size)
{ {
_message_buffer.size = size; _message_buffer.size = size;
_message_buffer.write_ptr = 0; _message_buffer.write_ptr = 0;
_message_buffer.read_ptr = 0; _message_buffer.read_ptr = 0;
_message_buffer.data = (char*)malloc(_message_buffer.size); _message_buffer.data = (char*)malloc(_message_buffer.size);
return (_message_buffer.data == 0) ? ERROR : OK;
int ret;
if (_message_buffer.data == 0) {
ret = ERROR;
_message_buffer.size = 0;
} else {
ret = OK;
}
return ret;
} }
void void
@ -1781,7 +1807,7 @@ Mavlink::task_main(int argc, char *argv[])
* set error flag instead */ * set error flag instead */
bool err_flag = false; bool err_flag = false;
while ((ch = getopt(argc, argv, "b:r:d:m:fpvw")) != EOF) { while ((ch = getopt(argc, argv, "b:r:d:m:fpvwx")) != EOF) {
switch (ch) { switch (ch) {
case 'b': case 'b':
_baudrate = strtoul(optarg, NULL, 10); _baudrate = strtoul(optarg, NULL, 10);
@ -1837,6 +1863,10 @@ Mavlink::task_main(int argc, char *argv[])
_wait_to_transmit = true; _wait_to_transmit = true;
break; break;
case 'x':
_ftp_on = true;
break;
default: default:
err_flag = true; err_flag = true;
break; break;
@ -1902,9 +1932,9 @@ Mavlink::task_main(int argc, char *argv[])
mavlink_logbuffer_init(&_logbuffer, 5); mavlink_logbuffer_init(&_logbuffer, 5);
/* if we are passing on mavlink messages, we need to prepare a buffer for this instance */ /* if we are passing on mavlink messages, we need to prepare a buffer for this instance */
if (_passing_on) { if (_passing_on || _ftp_on) {
/* initialize message buffer if multiplexing is on */ /* initialize message buffer if multiplexing is on */
if (OK != message_buffer_init(500)) { if (OK != message_buffer_init(2 * MAVLINK_MAX_PACKET_LEN)) {
errx(1, "can't allocate message buffer, exiting"); errx(1, "can't allocate message buffer, exiting");
} }
@ -2064,8 +2094,8 @@ Mavlink::task_main(int argc, char *argv[])
} }
} }
/* pass messages from other UARTs */ /* pass messages from other UARTs or FTP worker */
if (_passing_on) { if (_passing_on || _ftp_on) {
bool is_part; bool is_part;
void *read_ptr; void *read_ptr;
@ -2076,11 +2106,21 @@ Mavlink::task_main(int argc, char *argv[])
pthread_mutex_unlock(&_message_buffer_mutex); pthread_mutex_unlock(&_message_buffer_mutex);
if (available > 0) { if (available > 0) {
// int oldseq = mavlink_get_channel_status(get_channel())->current_tx_seq;
const mavlink_message_t* msg = (const mavlink_message_t*)read_ptr;
/* write first part of buffer */ /* write first part of buffer */
_mavlink_resend_uart(_channel, (const mavlink_message_t*)read_ptr); _mavlink_resend_uart(_channel, msg);
// mavlink_get_channel_status(get_channel())->current_tx_seq = oldseq;
// mavlink_msg_system_time_send(get_channel(), 255, 255);
message_buffer_mark_read(available); message_buffer_mark_read(available);
/* write second part of buffer if there is some */ /* write second part of buffer if there is some */
// XXX this doesn't quite work, as the resend UART call assumes a continous block
if (is_part) { if (is_part) {
/* guard get ptr by mutex */ /* guard get ptr by mutex */
pthread_mutex_lock(&_message_buffer_mutex); pthread_mutex_lock(&_message_buffer_mutex);
@ -2139,7 +2179,7 @@ Mavlink::task_main(int argc, char *argv[])
/* close mavlink logging device */ /* close mavlink logging device */
close(_mavlink_fd); close(_mavlink_fd);
if (_passing_on) { if (_passing_on || _ftp_on) {
message_buffer_destroy(); message_buffer_destroy();
pthread_mutex_destroy(&_message_buffer_mutex); pthread_mutex_destroy(&_message_buffer_mutex);
} }
@ -2281,7 +2321,7 @@ Mavlink::stream(int argc, char *argv[])
static void usage() static void usage()
{ {
warnx("usage: mavlink {start|stop-all|stream} [-d device] [-b baudrate] [-r rate] [-m mode] [-s stream] [-f] [-p] [-v] [-w]"); warnx("usage: mavlink {start|stop-all|stream} [-d device] [-b baudrate]\n\t[-r rate][-m mode] [-s stream] [-f] [-p] [-v] [-w] [-x]");
} }
int mavlink_main(int argc, char *argv[]) int mavlink_main(int argc, char *argv[])

2
src/modules/mavlink/mavlink_receiver.cpp

@ -154,7 +154,7 @@ MavlinkReceiver::handle_message(mavlink_message_t *msg)
break; break;
case MAVLINK_MSG_ID_ENCAPSULATED_DATA: case MAVLINK_MSG_ID_ENCAPSULATED_DATA:
MavlinkFTP::getServer()->handle_message(msg, _mavlink->get_channel()); MavlinkFTP::getServer()->handle_message(_mavlink, msg);
break; break;
default: default:

Loading…
Cancel
Save