18 #ifndef GAZEBO_TRANSPORT_NODE_HH_ 19 #define GAZEBO_TRANSPORT_NODE_HH_ 22 #include <boost/bind.hpp> 23 #include <boost/enable_shared_from_this.hpp> 39 class GZ_TRANSPORT_VISIBLE PublishTask :
public tbb::task
45 const google::protobuf::Message &_message)
48 this->msg = _message.New();
49 this->msg->CopyFrom(_message);
54 public: tbb::task *execute()
56 this->pub->WaitForConnection();
57 this->pub->Publish(*this->msg,
true);
58 this->pub->SendMessage();
68 private: google::protobuf::Message *msg;
78 class GZ_TRANSPORT_VISIBLE
Node :
79 public boost::enable_shared_from_this<Node>
85 public:
virtual ~
Node();
95 public:
void Init(
const std::string &_space =
"");
108 public:
bool TryInit(
115 public:
bool IsInitialized()
const;
122 public: std::string GetTopicNamespace()
const;
127 public: std::string DecodeTopicName(
const std::string &_topic);
132 public: std::string EncodeTopicName(
const std::string &_topic);
136 public:
unsigned int GetId()
const;
140 public:
void ProcessPublishers();
143 public:
void ProcessIncoming();
148 public:
bool HasLatchedSubscriber(
const std::string &_topic)
const;
157 public:
template<
typename M>
159 const google::protobuf::Message &_message)
162 PublishTask *task =
new(tbb::task::allocate_root())
163 PublishTask(pub, _message);
165 tbb::task::enqueue(*task);
176 public:
template<
typename M>
178 unsigned int _queueLimit = 1000,
181 std::string decodedTopic = this->DecodeTopicName(_topic);
184 decodedTopic, _queueLimit, _hzRate);
186 boost::mutex::scoped_lock lock(this->publisherMutex);
187 publisher->SetNode(shared_from_this());
188 this->publishers.push_back(publisher);
199 public:
void Publish(
const std::string &_topic,
200 const google::protobuf::Message &_message)
203 _message.GetTypeName());
204 pub->WaitForConnection();
206 pub->Publish(_message,
true);
217 const std::string &_msgTypeName,
218 unsigned int _queueLimit = 1000,
221 std::string decodedTopic = this->DecodeTopicName(_topic);
224 decodedTopic, _msgTypeName, _queueLimit, _hzRate);
226 boost::mutex::scoped_lock lock(this->publisherMutex);
227 publisher->SetNode(shared_from_this());
228 this->publishers.push_back(publisher);
240 public:
template<
typename M,
typename T>
242 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
243 bool _latching =
false)
246 std::string decodedTopic = this->DecodeTopicName(_topic);
247 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
250 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
258 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
269 public:
template<
typename M>
271 void(*_fp)(
const boost::shared_ptr<M const> &),
272 bool _latching =
false)
275 std::string decodedTopic = this->DecodeTopicName(_topic);
276 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
279 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
280 this->callbacks[decodedTopic].push_back(
287 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
301 void(T::*_fp)(
const std::string &), T *_obj,
302 bool _latching =
false)
305 std::string decodedTopic = this->DecodeTopicName(_topic);
306 ops.
Init(decodedTopic, shared_from_this(), _latching);
309 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
317 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
330 void(*_fp)(
const std::string &),
bool _latching =
false)
333 std::string decodedTopic = this->DecodeTopicName(_topic);
334 ops.
Init(decodedTopic, shared_from_this(), _latching);
337 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
338 this->callbacks[decodedTopic].push_back(
345 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
354 public:
bool HandleData(
const std::string &_topic,
355 const std::string &_msg);
361 public:
bool HandleMessage(
const std::string &_topic,
MessagePtr _msg);
369 public:
void InsertLatchedMsg(
const std::string &_topic,
370 const std::string &_msg);
378 public:
void InsertLatchedMsg(
const std::string &_topic,
384 public: std::string GetMsgType(
const std::string &_topic)
const;
391 public:
void RemoveCallback(
const std::string &_topic,
unsigned int _id);
404 private:
bool PrivateInit(
const std::string &_space,
406 const bool _fallbackToDefault);
408 private: std::string topicNamespace;
409 private: std::vector<PublisherPtr> publishers;
410 private: std::vector<PublisherPtr>::iterator publishersIter;
411 private:
static unsigned int idCounter;
412 private:
unsigned int id;
414 private:
typedef std::list<CallbackHelperPtr> Callback_L;
415 private:
typedef std::map<std::string, Callback_L> Callback_M;
416 private: Callback_M callbacks;
417 private: std::map<std::string, std::list<std::string> > incomingMsgs;
420 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
422 private: boost::mutex publisherMutex;
423 private: boost::mutex publisherDeleteMutex;
424 private: boost::recursive_mutex incomingMutex;
428 private: boost::recursive_mutex processIncomingMutex;
430 private:
bool initialized;
Options for a subscription.
Definition: SubscribeOptions.hh:35
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:177
Forward declarations for the common classes.
Definition: Animation.hh:26
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:300
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:199
Forward declarations for transport.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:241
Callback helper Template.
Definition: CallbackHelper.hh:111
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:270
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:216
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
#define NULL
Definition: CommonTypes.hh:31
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:329
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:44
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36