22 #include <boost/enable_shared_from_this.hpp> 
   37     class PublishTask : 
public tbb::task
 
   43                   const google::protobuf::Message &_message)
 
   46         this->msg = _message.New();
 
   47         this->msg->CopyFrom(_message);
 
   52       public: tbb::task *execute()
 
   54                 this->pub->WaitForConnection();
 
   55                 this->pub->Publish(*this->msg, 
true);
 
   56                 this->pub->SendMessage();
 
   66       private: google::protobuf::Message *msg;
 
   76     class Node : 
public boost::enable_shared_from_this<Node>
 
   82       public: 
virtual ~Node();
 
   88       public: 
void Init(
const std::string &_space =
"");
 
  109       public: 
unsigned int GetId() 
const;
 
  130       public: 
template<
typename M>
 
  132                   const google::protobuf::Message &_message)
 
  135                 PublishTask *task = 
new(tbb::task::allocate_root())
 
  136                   PublishTask(pub, _message);
 
  138                 tbb::task::enqueue(*task);
 
  149       public: 
template<
typename M>
 
  151                                         unsigned int _queueLimit = 1000,
 
  157               decodedTopic, _queueLimit, _hzRate);
 
  159         boost::mutex::scoped_lock lock(this->publisherMutex);
 
  160         publisher->SetNode(shared_from_this());
 
  161         this->publishers.push_back(publisher);
 
  173       public: 
template<
typename M, 
typename T>
 
  175           void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
 
  176           bool _latching = 
false)
 
  180         ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
 
  183           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
 
  191         result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
 
  202       public: 
template<
typename M>
 
  204           void(*_fp)(
const boost::shared_ptr<M const> &),
 
  205                      bool _latching = 
false)
 
  209         ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
 
  212           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
 
  213           this->callbacks[decodedTopic].push_back(
 
  220         result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
 
  234           void(T::*_fp)(
const std::string &), T *_obj,
 
  235           bool _latching = 
false)
 
  239         ops.
Init(decodedTopic, shared_from_this(), _latching);
 
  242           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
 
  250         result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
 
  263           void(*_fp)(
const std::string &), 
bool _latching = 
false)
 
  267         ops.
Init(decodedTopic, shared_from_this(), _latching);
 
  270           boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
 
  271           this->callbacks[decodedTopic].push_back(
 
  278         result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
 
  287       public: 
bool HandleData(
const std::string &_topic,
 
  288                               const std::string &_msg);
 
  303                                     const std::string &_msg);
 
  317       public: std::string 
GetMsgType(
const std::string &_topic) 
const;
 
  324       public: 
void RemoveCallback(
const std::string &_topic, 
unsigned int _id);
 
  326       private: std::string topicNamespace;
 
  327       private: std::vector<PublisherPtr> publishers;
 
  328       private: std::vector<PublisherPtr>::iterator publishersIter;
 
  329       private: 
static unsigned int idCounter;
 
  330       private: 
unsigned int id;
 
  332       private: 
typedef std::list<CallbackHelperPtr> Callback_L;
 
  333       private: 
typedef std::map<std::string, Callback_L> Callback_M;
 
  334       private: Callback_M callbacks;
 
  335       private: std::map<std::string, std::list<std::string> > incomingMsgs;
 
  338       private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
 
  340       private: boost::mutex publisherMutex;
 
  341       private: boost::mutex publisherDeleteMutex;
 
  342       private: boost::recursive_mutex incomingMutex;
 
  346       private: boost::recursive_mutex processIncomingMutex;
 
  348       private: 
bool initialized;