You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
377 lines
8.9 KiB
377 lines
8.9 KiB
#ifndef MAVLINKPROTOBUFMANAGER_HPP |
|
#define MAVLINKPROTOBUFMANAGER_HPP |
|
|
|
#include <deque> |
|
#include <google/protobuf/message.h> |
|
#include <iostream> |
|
#include <tr1/memory> |
|
|
|
#include "checksum.h" |
|
#include "common/mavlink.h" |
|
#include "mavlink_types.h" |
|
#include <pixhawk/pixhawk.pb.h> |
|
|
|
namespace mavlink |
|
{ |
|
|
|
class ProtobufManager |
|
{ |
|
public: |
|
ProtobufManager() |
|
: mRegisteredTypeCount(0) |
|
, mStreamID(0) |
|
, mVerbose(false) |
|
, kExtendedHeaderSize(MAVLINK_EXTENDED_HEADER_LEN) |
|
, kExtendedPayloadMaxSize(MAVLINK_MAX_EXTENDED_PAYLOAD_LEN) |
|
{ |
|
// register GLOverlay |
|
{ |
|
std::tr1::shared_ptr<px::GLOverlay> msg(new px::GLOverlay); |
|
registerType(msg); |
|
} |
|
|
|
// register ObstacleList |
|
{ |
|
std::tr1::shared_ptr<px::ObstacleList> msg(new px::ObstacleList); |
|
registerType(msg); |
|
} |
|
|
|
// register ObstacleMap |
|
{ |
|
std::tr1::shared_ptr<px::ObstacleMap> msg(new px::ObstacleMap); |
|
registerType(msg); |
|
} |
|
|
|
// register Path |
|
{ |
|
std::tr1::shared_ptr<px::Path> msg(new px::Path); |
|
registerType(msg); |
|
} |
|
|
|
// register PointCloudXYZI |
|
{ |
|
std::tr1::shared_ptr<px::PointCloudXYZI> msg(new px::PointCloudXYZI); |
|
registerType(msg); |
|
} |
|
|
|
// register PointCloudXYZRGB |
|
{ |
|
std::tr1::shared_ptr<px::PointCloudXYZRGB> msg(new px::PointCloudXYZRGB); |
|
registerType(msg); |
|
} |
|
|
|
// register RGBDImage |
|
{ |
|
std::tr1::shared_ptr<px::RGBDImage> msg(new px::RGBDImage); |
|
registerType(msg); |
|
} |
|
|
|
srand(time(NULL)); |
|
mStreamID = rand() + 1; |
|
} |
|
|
|
bool fragmentMessage(uint8_t system_id, uint8_t component_id, |
|
uint8_t target_system, uint8_t target_component, |
|
const google::protobuf::Message& protobuf_msg, |
|
std::vector<mavlink_extended_message_t>& fragments) const |
|
{ |
|
TypeMap::const_iterator it = mTypeMap.find(protobuf_msg.GetTypeName()); |
|
if (it == mTypeMap.end()) |
|
{ |
|
std::cout << "# WARNING: Protobuf message with type " |
|
<< protobuf_msg.GetTypeName() << " is not registered." |
|
<< std::endl; |
|
return false; |
|
} |
|
|
|
uint8_t typecode = it->second; |
|
|
|
std::string data = protobuf_msg.SerializeAsString(); |
|
|
|
int fragmentCount = (protobuf_msg.ByteSize() + kExtendedPayloadMaxSize - 1) / kExtendedPayloadMaxSize; |
|
unsigned int offset = 0; |
|
|
|
for (int i = 0; i < fragmentCount; ++i) |
|
{ |
|
mavlink_extended_message_t fragment; |
|
|
|
// write extended header data |
|
uint8_t* payload = reinterpret_cast<uint8_t*>(fragment.base_msg.payload64); |
|
unsigned int length = 0; |
|
uint8_t flags = 0; |
|
|
|
if (i < fragmentCount - 1) |
|
{ |
|
length = kExtendedPayloadMaxSize; |
|
flags |= 0x1; |
|
} |
|
else |
|
{ |
|
length = protobuf_msg.ByteSize() - kExtendedPayloadMaxSize * (fragmentCount - 1); |
|
} |
|
|
|
memcpy(payload, &target_system, 1); |
|
memcpy(payload + 1, &target_component, 1); |
|
memcpy(payload + 2, &typecode, 1); |
|
memcpy(payload + 3, &length, 4); |
|
memcpy(payload + 7, &mStreamID, 2); |
|
memcpy(payload + 9, &offset, 4); |
|
memcpy(payload + 13, &flags, 1); |
|
|
|
fragment.base_msg.msgid = MAVLINK_MSG_ID_EXTENDED_MESSAGE; |
|
mavlink_finalize_message(&fragment.base_msg, system_id, component_id, kExtendedHeaderSize, 0); |
|
|
|
// write extended payload data |
|
fragment.extended_payload_len = length; |
|
memcpy(fragment.extended_payload, &data[offset], length); |
|
|
|
fragments.push_back(fragment); |
|
offset += length; |
|
} |
|
|
|
if (mVerbose) |
|
{ |
|
std::cerr << "# INFO: Split extended message with size " |
|
<< protobuf_msg.ByteSize() << " into " |
|
<< fragmentCount << " fragments." << std::endl; |
|
} |
|
|
|
return true; |
|
} |
|
|
|
bool cacheFragment(mavlink_extended_message_t& msg) |
|
{ |
|
if (!validFragment(msg)) |
|
{ |
|
if (mVerbose) |
|
{ |
|
std::cerr << "# WARNING: Message is not a valid fragment. " |
|
<< "Dropping message..." << std::endl; |
|
} |
|
return false; |
|
} |
|
|
|
// read extended header |
|
uint8_t* payload = reinterpret_cast<uint8_t*>(msg.base_msg.payload64); |
|
uint8_t typecode = 0; |
|
unsigned int length = 0; |
|
unsigned short streamID = 0; |
|
unsigned int offset = 0; |
|
uint8_t flags = 0; |
|
|
|
memcpy(&typecode, payload + 2, 1); |
|
memcpy(&length, payload + 3, 4); |
|
memcpy(&streamID, payload + 7, 2); |
|
memcpy(&offset, payload + 9, 4); |
|
memcpy(&flags, payload + 13, 1); |
|
|
|
if (typecode >= mTypeMap.size()) |
|
{ |
|
std::cout << "# WARNING: Protobuf message with type code " |
|
<< static_cast<int>(typecode) << " is not registered." << std::endl; |
|
return false; |
|
} |
|
|
|
bool reassemble = false; |
|
|
|
FragmentQueue::iterator it = mFragmentQueue.find(streamID); |
|
if (it == mFragmentQueue.end()) |
|
{ |
|
if (offset == 0) |
|
{ |
|
mFragmentQueue[streamID].push_back(msg); |
|
|
|
if ((flags & 0x1) != 0x1) |
|
{ |
|
reassemble = true; |
|
} |
|
|
|
if (mVerbose) |
|
{ |
|
std::cerr << "# INFO: Added fragment to new queue." |
|
<< std::endl; |
|
} |
|
} |
|
else |
|
{ |
|
if (mVerbose) |
|
{ |
|
std::cerr << "# WARNING: Message is not a valid fragment. " |
|
<< "Dropping message..." << std::endl; |
|
} |
|
} |
|
} |
|
else |
|
{ |
|
std::deque<mavlink_extended_message_t>& queue = it->second; |
|
|
|
if (queue.empty()) |
|
{ |
|
if (offset == 0) |
|
{ |
|
queue.push_back(msg); |
|
|
|
if ((flags & 0x1) != 0x1) |
|
{ |
|
reassemble = true; |
|
} |
|
} |
|
else |
|
{ |
|
if (mVerbose) |
|
{ |
|
std::cerr << "# WARNING: Message is not a valid fragment. " |
|
<< "Dropping message..." << std::endl; |
|
} |
|
} |
|
} |
|
else |
|
{ |
|
if (fragmentDataSize(queue.back()) + fragmentOffset(queue.back()) != offset) |
|
{ |
|
if (mVerbose) |
|
{ |
|
std::cerr << "# WARNING: Previous fragment(s) have been lost. " |
|
<< "Dropping message and clearing queue..." << std::endl; |
|
} |
|
queue.clear(); |
|
} |
|
else |
|
{ |
|
queue.push_back(msg); |
|
|
|
if ((flags & 0x1) != 0x1) |
|
{ |
|
reassemble = true; |
|
} |
|
} |
|
} |
|
} |
|
|
|
if (reassemble) |
|
{ |
|
std::deque<mavlink_extended_message_t>& queue = mFragmentQueue[streamID]; |
|
|
|
std::string data; |
|
for (size_t i = 0; i < queue.size(); ++i) |
|
{ |
|
mavlink_extended_message_t& mavlink_msg = queue.at(i); |
|
|
|
data.append(reinterpret_cast<char*>(&mavlink_msg.extended_payload[0]), |
|
static_cast<size_t>(mavlink_msg.extended_payload_len)); |
|
} |
|
|
|
mMessages.at(typecode)->ParseFromString(data); |
|
|
|
mMessageAvailable.at(typecode) = true; |
|
|
|
queue.clear(); |
|
|
|
if (mVerbose) |
|
{ |
|
std::cerr << "# INFO: Reassembled fragments for message with typename " |
|
<< mMessages.at(typecode)->GetTypeName() << " and size " |
|
<< mMessages.at(typecode)->ByteSize() |
|
<< "." << std::endl; |
|
} |
|
} |
|
|
|
return true; |
|
} |
|
|
|
bool getMessage(std::tr1::shared_ptr<google::protobuf::Message>& msg) |
|
{ |
|
for (size_t i = 0; i < mMessageAvailable.size(); ++i) |
|
{ |
|
if (mMessageAvailable.at(i)) |
|
{ |
|
msg = mMessages.at(i); |
|
mMessageAvailable.at(i) = false; |
|
|
|
return true; |
|
} |
|
} |
|
|
|
return false; |
|
} |
|
|
|
private: |
|
void registerType(const std::tr1::shared_ptr<google::protobuf::Message>& msg) |
|
{ |
|
mTypeMap[msg->GetTypeName()] = mRegisteredTypeCount; |
|
++mRegisteredTypeCount; |
|
mMessages.push_back(msg); |
|
mMessageAvailable.push_back(false); |
|
} |
|
|
|
bool validFragment(const mavlink_extended_message_t& msg) const |
|
{ |
|
if (msg.base_msg.magic != MAVLINK_STX || |
|
msg.base_msg.len != kExtendedHeaderSize || |
|
msg.base_msg.msgid != MAVLINK_MSG_ID_EXTENDED_MESSAGE) |
|
{ |
|
return false; |
|
} |
|
|
|
uint16_t checksum; |
|
checksum = crc_calculate(reinterpret_cast<const uint8_t*>(&msg.base_msg.len), MAVLINK_CORE_HEADER_LEN); |
|
crc_accumulate_buffer(&checksum, reinterpret_cast<const char*>(&msg.base_msg.payload64), kExtendedHeaderSize); |
|
#if MAVLINK_CRC_EXTRA |
|
static const uint8_t mavlink_message_crcs[256] = MAVLINK_MESSAGE_CRCS; |
|
crc_accumulate(mavlink_message_crcs[msg.base_msg.msgid], &checksum); |
|
#endif |
|
|
|
if (mavlink_ck_a(&(msg.base_msg)) != (uint8_t)(checksum & 0xFF) && |
|
mavlink_ck_b(&(msg.base_msg)) != (uint8_t)(checksum >> 8)) |
|
{ |
|
return false; |
|
} |
|
|
|
return true; |
|
} |
|
|
|
unsigned int fragmentDataSize(const mavlink_extended_message_t& msg) const |
|
{ |
|
const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64); |
|
|
|
return *(reinterpret_cast<const unsigned int*>(payload + 3)); |
|
} |
|
|
|
unsigned int fragmentOffset(const mavlink_extended_message_t& msg) const |
|
{ |
|
const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64); |
|
|
|
return *(reinterpret_cast<const unsigned int*>(payload + 9)); |
|
} |
|
|
|
int mRegisteredTypeCount; |
|
unsigned short mStreamID; |
|
bool mVerbose; |
|
|
|
typedef std::map<std::string, uint8_t> TypeMap; |
|
TypeMap mTypeMap; |
|
std::vector< std::tr1::shared_ptr<google::protobuf::Message> > mMessages; |
|
std::vector<bool> mMessageAvailable; |
|
|
|
typedef std::map<unsigned short, std::deque<mavlink_extended_message_t> > FragmentQueue; |
|
FragmentQueue mFragmentQueue; |
|
|
|
const int kExtendedHeaderSize; |
|
/** |
|
* Extended header structure |
|
* ========================= |
|
* byte 0 - target_system |
|
* byte 1 - target_component |
|
* byte 2 - extended message id (type code) |
|
* bytes 3-6 - extended payload size in bytes |
|
* byte 7-8 - stream ID |
|
* byte 9-12 - fragment offset |
|
* byte 13 - fragment flags (bit 0 - 1=more fragments, 0=last fragment) |
|
*/ |
|
|
|
const int kExtendedPayloadMaxSize; |
|
}; |
|
|
|
} |
|
|
|
#endif
|
|
|