22 #include <boost/enable_shared_from_this.hpp>
44 const google::protobuf::Message &_message)
47 this->msg = _message.New();
48 this->msg->CopyFrom(_message);
53 public: tbb::task *execute()
55 this->pub->WaitForConnection();
56 this->pub->Publish(*this->msg,
true);
57 this->pub->SendMessage();
67 private: google::protobuf::Message *msg;
78 public boost::enable_shared_from_this<Node>
84 public:
virtual ~
Node();
90 public:
void Init(
const std::string &_space =
"");
97 public: std::string GetTopicNamespace()
const;
102 public: std::string DecodeTopicName(
const std::string &_topic);
107 public: std::string EncodeTopicName(
const std::string &_topic);
111 public:
unsigned int GetId()
const;
115 public:
void ProcessPublishers();
118 public:
void ProcessIncoming();
123 public:
bool HasLatchedSubscriber(
const std::string &_topic)
const;
132 public:
template<
typename M>
134 const google::protobuf::Message &_message)
137 PublishTask *task =
new(tbb::task::allocate_root())
138 PublishTask(pub, _message);
140 tbb::task::enqueue(*task);
151 public:
template<
typename M>
153 unsigned int _queueLimit = 1000,
156 std::string decodedTopic = this->DecodeTopicName(_topic);
159 decodedTopic, _queueLimit, _hzRate);
161 boost::mutex::scoped_lock lock(this->publisherMutex);
162 publisher->SetNode(shared_from_this());
163 this->publishers.push_back(publisher);
175 public:
template<
typename M,
typename T>
177 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
178 bool _latching =
false)
181 std::string decodedTopic = this->DecodeTopicName(_topic);
182 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
185 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
193 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
204 public:
template<
typename M>
206 void(*_fp)(
const boost::shared_ptr<M const> &),
207 bool _latching =
false)
210 std::string decodedTopic = this->DecodeTopicName(_topic);
211 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
214 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
215 this->callbacks[decodedTopic].push_back(
222 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
236 void(T::*_fp)(
const std::string &), T *_obj,
237 bool _latching =
false)
240 std::string decodedTopic = this->DecodeTopicName(_topic);
241 ops.
Init(decodedTopic, shared_from_this(), _latching);
244 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
252 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
265 void(*_fp)(
const std::string &),
bool _latching =
false)
268 std::string decodedTopic = this->DecodeTopicName(_topic);
269 ops.
Init(decodedTopic, shared_from_this(), _latching);
272 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
273 this->callbacks[decodedTopic].push_back(
280 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
289 public:
bool HandleData(
const std::string &_topic,
290 const std::string &_msg);
296 public:
bool HandleMessage(
const std::string &_topic,
MessagePtr _msg);
304 public:
void InsertLatchedMsg(
const std::string &_topic,
305 const std::string &_msg);
313 public:
void InsertLatchedMsg(
const std::string &_topic,
319 public: std::string GetMsgType(
const std::string &_topic)
const;
326 public:
void RemoveCallback(
const std::string &_topic,
unsigned int _id);
328 private: std::string topicNamespace;
329 private: std::vector<PublisherPtr> publishers;
330 private: std::vector<PublisherPtr>::iterator publishersIter;
331 private:
static unsigned int idCounter;
332 private:
unsigned int id;
334 private:
typedef std::list<CallbackHelperPtr> Callback_L;
335 private:
typedef std::map<std::string, Callback_L> Callback_M;
336 private: Callback_M callbacks;
337 private: std::map<std::string, std::list<std::string> > incomingMsgs;
340 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
342 private: boost::mutex publisherMutex;
343 private: boost::mutex publisherDeleteMutex;
344 private: boost::recursive_mutex incomingMutex;
348 private: boost::recursive_mutex processIncomingMutex;
350 private:
bool initialized;
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:235
Options for a subscription.
Definition: SubscribeOptions.hh:35
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:152
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
Forward declarations for transport.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:107
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:176
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:205
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:77
#define NULL
Definition: CommonTypes.hh:30
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:264
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:133
#define GZ_TRANSPORT_VISIBLE
Definition: system.hh:166
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:171
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:101