Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2015 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/enable_shared_from_this.hpp>
23 #include <map>
24 #include <list>
25 #include <string>
26 #include <vector>
27 
30 #include "gazebo/util/system.hh"
31 
32 namespace gazebo
33 {
34  namespace transport
35  {
38  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
39  {
43  public: PublishTask(transport::PublisherPtr _pub,
44  const google::protobuf::Message &_message)
45  : pub(_pub)
46  {
47  this->msg = _message.New();
48  this->msg->CopyFrom(_message);
49  }
50 
53  public: tbb::task *execute()
54  {
55  this->pub->WaitForConnection();
56  this->pub->Publish(*this->msg, true);
57  this->pub->SendMessage();
58  delete this->msg;
59  this->pub.reset();
60  return NULL;
61  }
62 
64  private: transport::PublisherPtr pub;
65 
67  private: google::protobuf::Message *msg;
68  };
70 
73 
78  public boost::enable_shared_from_this<Node>
79  {
81  public: Node();
82 
84  public: virtual ~Node();
85 
90  public: void Init(const std::string &_space ="");
91 
93  public: void Fini();
94 
97  public: std::string GetTopicNamespace() const;
98 
102  public: std::string DecodeTopicName(const std::string &_topic);
103 
107  public: std::string EncodeTopicName(const std::string &_topic);
108 
111  public: unsigned int GetId() const;
112 
115  public: void ProcessPublishers();
116 
118  public: void ProcessIncoming();
119 
123  public: bool HasLatchedSubscriber(const std::string &_topic) const;
124 
125 
132  public: template<typename M>
133  void Publish(const std::string &_topic,
134  const google::protobuf::Message &_message)
135  {
136  transport::PublisherPtr pub = this->Advertise<M>(_topic);
137  PublishTask *task = new(tbb::task::allocate_root())
138  PublishTask(pub, _message);
139 
140  tbb::task::enqueue(*task);
141  return;
142  }
143 
151  public: template<typename M>
152  transport::PublisherPtr Advertise(const std::string &_topic,
153  unsigned int _queueLimit = 1000,
154  double _hzRate = 0)
155  {
156  std::string decodedTopic = this->DecodeTopicName(_topic);
157  PublisherPtr publisher =
158  transport::TopicManager::Instance()->Advertise<M>(
159  decodedTopic, _queueLimit, _hzRate);
160 
161  boost::mutex::scoped_lock lock(this->publisherMutex);
162  publisher->SetNode(shared_from_this());
163  this->publishers.push_back(publisher);
164 
165  return publisher;
166  }
167 
175  public: template<typename M, typename T>
176  SubscriberPtr Subscribe(const std::string &_topic,
177  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
178  bool _latching = false)
179  {
180  SubscribeOptions ops;
181  std::string decodedTopic = this->DecodeTopicName(_topic);
182  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
183 
184  {
185  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
186  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
187  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
188  }
189 
190  SubscriberPtr result =
191  transport::TopicManager::Instance()->Subscribe(ops);
192 
193  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
194 
195  return result;
196  }
197 
204  public: template<typename M>
205  SubscriberPtr Subscribe(const std::string &_topic,
206  void(*_fp)(const boost::shared_ptr<M const> &),
207  bool _latching = false)
208  {
209  SubscribeOptions ops;
210  std::string decodedTopic = this->DecodeTopicName(_topic);
211  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
212 
213  {
214  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
215  this->callbacks[decodedTopic].push_back(
216  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
217  }
218 
219  SubscriberPtr result =
220  transport::TopicManager::Instance()->Subscribe(ops);
221 
222  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
223 
224  return result;
225  }
226 
234  template<typename T>
235  SubscriberPtr Subscribe(const std::string &_topic,
236  void(T::*_fp)(const std::string &), T *_obj,
237  bool _latching = false)
238  {
239  SubscribeOptions ops;
240  std::string decodedTopic = this->DecodeTopicName(_topic);
241  ops.Init(decodedTopic, shared_from_this(), _latching);
242 
243  {
244  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
245  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
246  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
247  }
248 
249  SubscriberPtr result =
250  transport::TopicManager::Instance()->Subscribe(ops);
251 
252  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
253 
254  return result;
255  }
256 
257 
264  SubscriberPtr Subscribe(const std::string &_topic,
265  void(*_fp)(const std::string &), bool _latching = false)
266  {
267  SubscribeOptions ops;
268  std::string decodedTopic = this->DecodeTopicName(_topic);
269  ops.Init(decodedTopic, shared_from_this(), _latching);
270 
271  {
272  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
273  this->callbacks[decodedTopic].push_back(
275  }
276 
277  SubscriberPtr result =
278  transport::TopicManager::Instance()->Subscribe(ops);
279 
280  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
281 
282  return result;
283  }
284 
289  public: bool HandleData(const std::string &_topic,
290  const std::string &_msg);
291 
296  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
297 
304  public: void InsertLatchedMsg(const std::string &_topic,
305  const std::string &_msg);
306 
313  public: void InsertLatchedMsg(const std::string &_topic,
314  MessagePtr _msg);
315 
319  public: std::string GetMsgType(const std::string &_topic) const;
320 
326  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
327 
328  private: std::string topicNamespace;
329  private: std::vector<PublisherPtr> publishers;
330  private: std::vector<PublisherPtr>::iterator publishersIter;
331  private: static unsigned int idCounter;
332  private: unsigned int id;
333 
334  private: typedef std::list<CallbackHelperPtr> Callback_L;
335  private: typedef std::map<std::string, Callback_L> Callback_M;
336  private: Callback_M callbacks;
337  private: std::map<std::string, std::list<std::string> > incomingMsgs;
338 
340  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
341 
342  private: boost::mutex publisherMutex;
343  private: boost::mutex publisherDeleteMutex;
344  private: boost::recursive_mutex incomingMutex;
345 
348  private: boost::recursive_mutex processIncomingMutex;
349 
350  private: bool initialized;
351  };
353  }
354 }
355 #endif
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:235
Options for a subscription.
Definition: SubscribeOptions.hh:35
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:152
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
Forward declarations for transport.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:107
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:176
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:205
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:77
#define NULL
Definition: CommonTypes.hh:30
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:264
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:133
#define GZ_TRANSPORT_VISIBLE
Definition: system.hh:166
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:171
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:101