22 #include <boost/enable_shared_from_this.hpp>
44 const google::protobuf::Message &_message)
47 this->msg = _message.New();
48 this->msg->CopyFrom(_message);
53 public: tbb::task *execute()
55 this->pub->WaitForConnection();
56 this->pub->Publish(*this->msg,
true);
57 this->pub->SendMessage();
67 private: google::protobuf::Message *msg;
83 public:
virtual ~
Node();
89 public:
void Init(
const std::string &_space =
"");
96 public: std::string GetTopicNamespace()
const;
101 public: std::string DecodeTopicName(
const std::string &_topic);
106 public: std::string EncodeTopicName(
const std::string &_topic);
110 public:
unsigned int GetId()
const;
114 public:
void ProcessPublishers();
117 public:
void ProcessIncoming();
122 public:
bool HasLatchedSubscriber(
const std::string &_topic)
const;
131 public:
template<
typename M>
133 const google::protobuf::Message &_message)
136 PublishTask *task =
new(tbb::task::allocate_root())
137 PublishTask(pub, _message);
139 tbb::task::enqueue(*task);
150 public:
template<
typename M>
152 unsigned int _queueLimit = 1000,
155 std::string decodedTopic = this->DecodeTopicName(_topic);
158 decodedTopic, _queueLimit, _hzRate);
160 boost::mutex::scoped_lock lock(this->publisherMutex);
161 publisher->SetNode(shared_from_this());
162 this->publishers.push_back(publisher);
174 public:
template<
typename M,
typename T>
176 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
177 bool _latching =
false)
180 std::string decodedTopic = this->DecodeTopicName(_topic);
181 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
184 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
192 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
203 public:
template<
typename M>
205 void(*_fp)(
const boost::shared_ptr<M const> &),
206 bool _latching =
false)
209 std::string decodedTopic = this->DecodeTopicName(_topic);
210 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
213 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
214 this->callbacks[decodedTopic].push_back(
221 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
235 void(T::*_fp)(
const std::string &), T *_obj,
236 bool _latching =
false)
239 std::string decodedTopic = this->DecodeTopicName(_topic);
240 ops.
Init(decodedTopic, shared_from_this(), _latching);
243 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
251 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
264 void(*_fp)(
const std::string &),
bool _latching =
false)
267 std::string decodedTopic = this->DecodeTopicName(_topic);
268 ops.
Init(decodedTopic, shared_from_this(), _latching);
271 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
272 this->callbacks[decodedTopic].push_back(
279 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
288 public:
bool HandleData(
const std::string &_topic,
289 const std::string &_msg);
295 public:
bool HandleMessage(
const std::string &_topic,
MessagePtr _msg);
303 public:
void InsertLatchedMsg(
const std::string &_topic,
304 const std::string &_msg);
312 public:
void InsertLatchedMsg(
const std::string &_topic,
318 public: std::string GetMsgType(
const std::string &_topic)
const;
325 public:
void RemoveCallback(
const std::string &_topic,
unsigned int _id);
327 private: std::string topicNamespace;
328 private: std::vector<PublisherPtr> publishers;
329 private: std::vector<PublisherPtr>::iterator publishersIter;
330 private:
static unsigned int idCounter;
331 private:
unsigned int id;
333 private:
typedef std::list<CallbackHelperPtr> Callback_L;
334 private:
typedef std::map<std::string, Callback_L> Callback_M;
335 private: Callback_M callbacks;
336 private: std::map<std::string, std::list<std::string> > incomingMsgs;
339 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
341 private: boost::mutex publisherMutex;
342 private: boost::mutex publisherDeleteMutex;
343 private: boost::recursive_mutex incomingMutex;
347 private: boost::recursive_mutex processIncomingMutex;
349 private:
bool initialized;