22 #include <boost/enable_shared_from_this.hpp>
37 class PublishTask :
public tbb::task
43 const google::protobuf::Message &_message)
46 this->msg = _message.New();
47 this->msg->CopyFrom(_message);
52 public: tbb::task *execute()
54 this->pub->WaitForConnection();
55 this->pub->Publish(*this->msg,
true);
56 this->pub->SendMessage();
66 private: google::protobuf::Message *msg;
76 class Node :
public boost::enable_shared_from_this<Node>
82 public:
virtual ~Node();
88 public:
void Init(
const std::string &_space =
"");
109 public:
unsigned int GetId()
const;
130 public:
template<
typename M>
132 const google::protobuf::Message &_message)
135 PublishTask *task =
new(tbb::task::allocate_root())
136 PublishTask(pub, _message);
138 tbb::task::enqueue(*task);
149 public:
template<
typename M>
151 unsigned int _queueLimit = 1000,
157 decodedTopic, _queueLimit, _hzRate);
159 boost::mutex::scoped_lock lock(this->publisherMutex);
160 publisher->SetNode(shared_from_this());
161 this->publishers.push_back(publisher);
173 public:
template<
typename M,
typename T>
175 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
176 bool _latching =
false)
180 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
183 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
191 result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
202 public:
template<
typename M>
204 void(*_fp)(
const boost::shared_ptr<M const> &),
205 bool _latching =
false)
209 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
212 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
213 this->callbacks[decodedTopic].push_back(
220 result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
234 void(T::*_fp)(
const std::string &), T *_obj,
235 bool _latching =
false)
239 ops.
Init(decodedTopic, shared_from_this(), _latching);
242 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
250 result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
263 void(*_fp)(
const std::string &),
bool _latching =
false)
267 ops.
Init(decodedTopic, shared_from_this(), _latching);
270 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
271 this->callbacks[decodedTopic].push_back(
278 result->SetCallbackId(this->callbacks[decodedTopic].back()->
GetId());
287 public:
bool HandleData(
const std::string &_topic,
288 const std::string &_msg);
303 const std::string &_msg);
317 public: std::string
GetMsgType(
const std::string &_topic)
const;
324 public:
void RemoveCallback(
const std::string &_topic,
unsigned int _id);
326 private: std::string topicNamespace;
327 private: std::vector<PublisherPtr> publishers;
328 private: std::vector<PublisherPtr>::iterator publishersIter;
329 private:
static unsigned int idCounter;
330 private:
unsigned int id;
332 private:
typedef std::list<CallbackHelperPtr> Callback_L;
333 private:
typedef std::map<std::string, Callback_L> Callback_M;
334 private: Callback_M callbacks;
335 private: std::map<std::string, std::list<std::string> > incomingMsgs;
338 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
340 private: boost::mutex publisherMutex;
341 private: boost::mutex publisherDeleteMutex;
342 private: boost::recursive_mutex incomingMutex;
346 private: boost::recursive_mutex processIncomingMutex;
348 private:
bool initialized;