All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/enable_shared_from_this.hpp>
23 #include <map>
24 #include <list>
25 #include <string>
26 #include <vector>
27 
30 
31 namespace gazebo
32 {
33  namespace transport
34  {
37  class PublishTask : public tbb::task
38  {
43  const google::protobuf::Message &_message)
44  : pub(_pub)
45  {
46  this->msg = _message.New();
47  this->msg->CopyFrom(_message);
48  }
49 
52  public: tbb::task *execute()
53  {
54  this->pub->WaitForConnection();
55  this->pub->Publish(*this->msg, true);
56  delete this->msg;
57  this->pub.reset();
58  return NULL;
59  }
60 
62  private: transport::PublisherPtr pub;
63 
65  private: google::protobuf::Message *msg;
66  };
68 
71 
75  class Node : public boost::enable_shared_from_this<Node>
76  {
78  public: Node();
79 
81  public: virtual ~Node();
82 
87  public: void Init(const std::string &_space ="");
88 
90  public: void Fini();
91 
94  public: std::string GetTopicNamespace() const;
95 
99  public: std::string DecodeTopicName(const std::string &_topic);
100 
104  public: std::string EncodeTopicName(const std::string &_topic);
105 
108  public: unsigned int GetId() const;
109 
112  public: void ProcessPublishers();
113 
115  public: void ProcessIncoming();
116 
120  public: bool HasLatchedSubscriber(const std::string &_topic) const;
121 
122 
129  public: template<typename M>
130  void Publish(const std::string &_topic,
131  const google::protobuf::Message &_message)
132  {
133  transport::PublisherPtr pub = this->Advertise<M>(_topic);
134  PublishTask *task = new(tbb::task::allocate_root())
135  PublishTask(pub, _message);
136 
137  tbb::task::enqueue(*task);
138  return;
139  }
140 
148  public: template<typename M>
149  transport::PublisherPtr Advertise(const std::string &_topic,
150  unsigned int _queueLimit = 1000,
151  double _hzRate = 0)
152  {
153  std::string decodedTopic = this->DecodeTopicName(_topic);
154  PublisherPtr publisher =
155  transport::TopicManager::Instance()->Advertise<M>(
156  decodedTopic, _queueLimit, _hzRate);
157 
158  boost::recursive_mutex::scoped_lock lock(this->publisherMutex);
159  this->publishers.push_back(publisher);
160  this->publishersEnd = this->publishers.end();
161 
162  return publisher;
163  }
164 
172  public: template<typename M, typename T>
173  SubscriberPtr Subscribe(const std::string &_topic,
174  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
175  bool _latching = false)
176  {
177  SubscribeOptions ops;
178  std::string decodedTopic = this->DecodeTopicName(_topic);
179  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
180 
181  {
182  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
183  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
184  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
185  }
186 
187  SubscriberPtr result =
188  transport::TopicManager::Instance()->Subscribe(ops);
189 
190  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
191 
192  return result;
193  }
194 
201  public: template<typename M>
202  SubscriberPtr Subscribe(const std::string &_topic,
203  void(*_fp)(const boost::shared_ptr<M const> &),
204  bool _latching = false)
205  {
206  SubscribeOptions ops;
207  std::string decodedTopic = this->DecodeTopicName(_topic);
208  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
209 
210  {
211  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
212  this->callbacks[decodedTopic].push_back(
213  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
214  }
215 
216  SubscriberPtr result =
217  transport::TopicManager::Instance()->Subscribe(ops);
218 
219  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
220 
221  return result;
222  }
223 
231  template<typename T>
232  SubscriberPtr Subscribe(const std::string &_topic,
233  void(T::*_fp)(const std::string &), T *_obj,
234  bool _latching = false)
235  {
236  SubscribeOptions ops;
237  std::string decodedTopic = this->DecodeTopicName(_topic);
238  ops.Init(decodedTopic, shared_from_this(), _latching);
239 
240  {
241  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
242  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
243  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
244  }
245 
246  SubscriberPtr result =
247  transport::TopicManager::Instance()->Subscribe(ops);
248 
249  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
250 
251  return result;
252  }
253 
254 
261  SubscriberPtr Subscribe(const std::string &_topic,
262  void(*_fp)(const std::string &), bool _latching = false)
263  {
264  SubscribeOptions ops;
265  std::string decodedTopic = this->DecodeTopicName(_topic);
266  ops.Init(decodedTopic, shared_from_this(), _latching);
267 
268  {
269  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
270  this->callbacks[decodedTopic].push_back(
272  }
273 
274  SubscriberPtr result =
275  transport::TopicManager::Instance()->Subscribe(ops);
276 
277  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
278 
279  return result;
280  }
281 
286  public: bool HandleData(const std::string &_topic,
287  const std::string &_msg);
288 
293  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
294 
301  public: void InsertLatchedMsg(const std::string &_topic,
302  const std::string &_msg);
303 
310  public: void InsertLatchedMsg(const std::string &_topic,
311  MessagePtr _msg);
312 
316  public: std::string GetMsgType(const std::string &_topic) const;
317 
323  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
324 
325  private: std::string topicNamespace;
326  private: std::vector<PublisherPtr> publishers;
327  private: std::vector<PublisherPtr>::iterator publishersIter;
328  private: std::vector<PublisherPtr>::iterator publishersEnd;
329  private: static unsigned int idCounter;
330  private: unsigned int id;
331 
332  private: typedef std::list<CallbackHelperPtr> Callback_L;
333  private: typedef std::map<std::string, Callback_L> Callback_M;
334  private: Callback_M callbacks;
335  private: std::map<std::string, std::list<std::string> > incomingMsgs;
336 
338  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
339 
340  private: boost::recursive_mutex publisherMutex;
341  private: boost::recursive_mutex incomingMutex;
342 
343  private: bool initialized;
344  };
346  }
347 }
348 #endif