Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <map>
25 #include <list>
26 #include <string>
27 #include <vector>
28 
31 #include "gazebo/util/system.hh"
32 
33 namespace gazebo
34 {
35  namespace transport
36  {
39  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
40  {
44  public: PublishTask(transport::PublisherPtr _pub,
45  const google::protobuf::Message &_message)
46  : pub(_pub)
47  {
48  this->msg = _message.New();
49  this->msg->CopyFrom(_message);
50  }
51 
54  public: tbb::task *execute()
55  {
56  this->pub->WaitForConnection();
57  this->pub->Publish(*this->msg, true);
58  this->pub->SendMessage();
59  delete this->msg;
60  this->pub.reset();
61  return NULL;
62  }
63 
65  private: transport::PublisherPtr pub;
66 
68  private: google::protobuf::Message *msg;
69  };
71 
74 
78  class GZ_TRANSPORT_VISIBLE Node :
79  public boost::enable_shared_from_this<Node>
80  {
82  public: Node();
83 
85  public: virtual ~Node();
86 
95  public: void Init(const std::string &_space ="");
96 
108  public: bool TryInit(
109  const common::Time &_maxWait = common::Time(1, 0));
110 
115  public: bool IsInitialized() const;
116 
118  public: void Fini();
119 
122  public: std::string GetTopicNamespace() const;
123 
127  public: std::string DecodeTopicName(const std::string &_topic);
128 
132  public: std::string EncodeTopicName(const std::string &_topic);
133 
136  public: unsigned int GetId() const;
137 
140  public: void ProcessPublishers();
141 
143  public: void ProcessIncoming();
144 
148  public: bool HasLatchedSubscriber(const std::string &_topic) const;
149 
150 
157  public: template<typename M>
158  void Publish(const std::string &_topic,
159  const google::protobuf::Message &_message)
160  {
161  transport::PublisherPtr pub = this->Advertise<M>(_topic);
162  PublishTask *task = new(tbb::task::allocate_root())
163  PublishTask(pub, _message);
164 
165  tbb::task::enqueue(*task);
166  return;
167  }
168 
176  public: template<typename M>
177  transport::PublisherPtr Advertise(const std::string &_topic,
178  unsigned int _queueLimit = 1000,
179  double _hzRate = 0)
180  {
181  std::string decodedTopic = this->DecodeTopicName(_topic);
182  PublisherPtr publisher =
183  transport::TopicManager::Instance()->Advertise<M>(
184  decodedTopic, _queueLimit, _hzRate);
185 
186  boost::mutex::scoped_lock lock(this->publisherMutex);
187  publisher->SetNode(shared_from_this());
188  this->publishers.push_back(publisher);
189 
190  return publisher;
191  }
192 
199  public: void Publish(const std::string &_topic,
200  const google::protobuf::Message &_message)
201  {
202  transport::PublisherPtr pub = this->Advertise(_topic,
203  _message.GetTypeName());
204  pub->WaitForConnection();
205 
206  pub->Publish(_message, true);
207  }
208 
216  public: transport::PublisherPtr Advertise(const std::string &_topic,
217  const std::string &_msgTypeName,
218  unsigned int _queueLimit = 1000,
219  double _hzRate = 0)
220  {
221  std::string decodedTopic = this->DecodeTopicName(_topic);
222  PublisherPtr publisher =
224  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
225 
226  boost::mutex::scoped_lock lock(this->publisherMutex);
227  publisher->SetNode(shared_from_this());
228  this->publishers.push_back(publisher);
229 
230  return publisher;
231  }
232 
240  public: template<typename M, typename T>
241  SubscriberPtr Subscribe(const std::string &_topic,
242  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
243  bool _latching = false)
244  {
245  SubscribeOptions ops;
246  std::string decodedTopic = this->DecodeTopicName(_topic);
247  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
248 
249  {
250  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
251  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
252  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
253  }
254 
255  SubscriberPtr result =
256  transport::TopicManager::Instance()->Subscribe(ops);
257 
258  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
259 
260  return result;
261  }
262 
269  public: template<typename M>
270  SubscriberPtr Subscribe(const std::string &_topic,
271  void(*_fp)(const boost::shared_ptr<M const> &),
272  bool _latching = false)
273  {
274  SubscribeOptions ops;
275  std::string decodedTopic = this->DecodeTopicName(_topic);
276  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
277 
278  {
279  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
280  this->callbacks[decodedTopic].push_back(
281  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
282  }
283 
284  SubscriberPtr result =
285  transport::TopicManager::Instance()->Subscribe(ops);
286 
287  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
288 
289  return result;
290  }
291 
299  template<typename T>
300  SubscriberPtr Subscribe(const std::string &_topic,
301  void(T::*_fp)(const std::string &), T *_obj,
302  bool _latching = false)
303  {
304  SubscribeOptions ops;
305  std::string decodedTopic = this->DecodeTopicName(_topic);
306  ops.Init(decodedTopic, shared_from_this(), _latching);
307 
308  {
309  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
310  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
311  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
312  }
313 
314  SubscriberPtr result =
315  transport::TopicManager::Instance()->Subscribe(ops);
316 
317  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
318 
319  return result;
320  }
321 
322 
329  SubscriberPtr Subscribe(const std::string &_topic,
330  void(*_fp)(const std::string &), bool _latching = false)
331  {
332  SubscribeOptions ops;
333  std::string decodedTopic = this->DecodeTopicName(_topic);
334  ops.Init(decodedTopic, shared_from_this(), _latching);
335 
336  {
337  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
338  this->callbacks[decodedTopic].push_back(
340  }
341 
342  SubscriberPtr result =
343  transport::TopicManager::Instance()->Subscribe(ops);
344 
345  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
346 
347  return result;
348  }
349 
354  public: bool HandleData(const std::string &_topic,
355  const std::string &_msg);
356 
361  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
362 
369  public: void InsertLatchedMsg(const std::string &_topic,
370  const std::string &_msg);
371 
378  public: void InsertLatchedMsg(const std::string &_topic,
379  MessagePtr _msg);
380 
384  public: std::string GetMsgType(const std::string &_topic) const;
385 
391  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
392 
404  private: bool PrivateInit(const std::string &_space,
405  const common::Time &_maxWait,
406  const bool _fallbackToDefault);
407 
408  private: std::string topicNamespace;
409  private: std::vector<PublisherPtr> publishers;
410  private: std::vector<PublisherPtr>::iterator publishersIter;
411  private: static unsigned int idCounter;
412  private: unsigned int id;
413 
414  private: typedef std::list<CallbackHelperPtr> Callback_L;
415  private: typedef std::map<std::string, Callback_L> Callback_M;
416  private: Callback_M callbacks;
417  private: std::map<std::string, std::list<std::string> > incomingMsgs;
418 
420  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
421 
422  private: boost::mutex publisherMutex;
423  private: boost::mutex publisherDeleteMutex;
424  private: boost::recursive_mutex incomingMutex;
425 
428  private: boost::recursive_mutex processIncomingMutex;
429 
430  private: bool initialized;
431  };
433  }
434 }
435 #endif
Options for a subscription.
Definition: SubscribeOptions.hh:35
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:177
Forward declarations for the common classes.
Definition: Animation.hh:26
transport
Definition: ConnectionManager.hh:35
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:300
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:199
Forward declarations for transport.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:241
Callback helper Template.
Definition: CallbackHelper.hh:111
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:270
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:216
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
#define NULL
Definition: CommonTypes.hh:31
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:329
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:47
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36