All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/enable_shared_from_this.hpp>
23 #include <map>
24 #include <list>
25 #include <string>
26 #include <vector>
27 
30 #include "gazebo/util/system.hh"
31 
32 namespace gazebo
33 {
34  namespace transport
35  {
38  class GAZEBO_VISIBLE PublishTask : public tbb::task
39  {
43  public: PublishTask(transport::PublisherPtr _pub,
44  const google::protobuf::Message &_message)
45  : pub(_pub)
46  {
47  this->msg = _message.New();
48  this->msg->CopyFrom(_message);
49  }
50 
53  public: tbb::task *execute()
54  {
55  this->pub->WaitForConnection();
56  this->pub->Publish(*this->msg, true);
57  this->pub->SendMessage();
58  delete this->msg;
59  this->pub.reset();
60  return NULL;
61  }
62 
64  private: transport::PublisherPtr pub;
65 
67  private: google::protobuf::Message *msg;
68  };
70 
73 
77  class GAZEBO_VISIBLE Node : public boost::enable_shared_from_this<Node>
78  {
80  public: Node();
81 
83  public: virtual ~Node();
84 
89  public: void Init(const std::string &_space ="");
90 
92  public: void Fini();
93 
96  public: std::string GetTopicNamespace() const;
97 
101  public: std::string DecodeTopicName(const std::string &_topic);
102 
106  public: std::string EncodeTopicName(const std::string &_topic);
107 
110  public: unsigned int GetId() const;
111 
114  public: void ProcessPublishers();
115 
117  public: void ProcessIncoming();
118 
122  public: bool HasLatchedSubscriber(const std::string &_topic) const;
123 
124 
131  public: template<typename M>
132  void Publish(const std::string &_topic,
133  const google::protobuf::Message &_message)
134  {
135  transport::PublisherPtr pub = this->Advertise<M>(_topic);
136  PublishTask *task = new(tbb::task::allocate_root())
137  PublishTask(pub, _message);
138 
139  tbb::task::enqueue(*task);
140  return;
141  }
142 
150  public: template<typename M>
151  transport::PublisherPtr Advertise(const std::string &_topic,
152  unsigned int _queueLimit = 1000,
153  double _hzRate = 0)
154  {
155  std::string decodedTopic = this->DecodeTopicName(_topic);
156  PublisherPtr publisher =
157  transport::TopicManager::Instance()->Advertise<M>(
158  decodedTopic, _queueLimit, _hzRate);
159 
160  boost::mutex::scoped_lock lock(this->publisherMutex);
161  publisher->SetNode(shared_from_this());
162  this->publishers.push_back(publisher);
163 
164  return publisher;
165  }
166 
174  public: template<typename M, typename T>
175  SubscriberPtr Subscribe(const std::string &_topic,
176  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
177  bool _latching = false)
178  {
179  SubscribeOptions ops;
180  std::string decodedTopic = this->DecodeTopicName(_topic);
181  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
182 
183  {
184  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
185  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
186  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
187  }
188 
189  SubscriberPtr result =
190  transport::TopicManager::Instance()->Subscribe(ops);
191 
192  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
193 
194  return result;
195  }
196 
203  public: template<typename M>
204  SubscriberPtr Subscribe(const std::string &_topic,
205  void(*_fp)(const boost::shared_ptr<M const> &),
206  bool _latching = false)
207  {
208  SubscribeOptions ops;
209  std::string decodedTopic = this->DecodeTopicName(_topic);
210  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
211 
212  {
213  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
214  this->callbacks[decodedTopic].push_back(
215  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
216  }
217 
218  SubscriberPtr result =
219  transport::TopicManager::Instance()->Subscribe(ops);
220 
221  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
222 
223  return result;
224  }
225 
233  template<typename T>
234  SubscriberPtr Subscribe(const std::string &_topic,
235  void(T::*_fp)(const std::string &), T *_obj,
236  bool _latching = false)
237  {
238  SubscribeOptions ops;
239  std::string decodedTopic = this->DecodeTopicName(_topic);
240  ops.Init(decodedTopic, shared_from_this(), _latching);
241 
242  {
243  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
244  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
245  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
246  }
247 
248  SubscriberPtr result =
249  transport::TopicManager::Instance()->Subscribe(ops);
250 
251  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
252 
253  return result;
254  }
255 
256 
263  SubscriberPtr Subscribe(const std::string &_topic,
264  void(*_fp)(const std::string &), bool _latching = false)
265  {
266  SubscribeOptions ops;
267  std::string decodedTopic = this->DecodeTopicName(_topic);
268  ops.Init(decodedTopic, shared_from_this(), _latching);
269 
270  {
271  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
272  this->callbacks[decodedTopic].push_back(
274  }
275 
276  SubscriberPtr result =
277  transport::TopicManager::Instance()->Subscribe(ops);
278 
279  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
280 
281  return result;
282  }
283 
288  public: bool HandleData(const std::string &_topic,
289  const std::string &_msg);
290 
295  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
296 
303  public: void InsertLatchedMsg(const std::string &_topic,
304  const std::string &_msg);
305 
312  public: void InsertLatchedMsg(const std::string &_topic,
313  MessagePtr _msg);
314 
318  public: std::string GetMsgType(const std::string &_topic) const;
319 
325  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
326 
327  private: std::string topicNamespace;
328  private: std::vector<PublisherPtr> publishers;
329  private: std::vector<PublisherPtr>::iterator publishersIter;
330  private: static unsigned int idCounter;
331  private: unsigned int id;
332 
333  private: typedef std::list<CallbackHelperPtr> Callback_L;
334  private: typedef std::map<std::string, Callback_L> Callback_M;
335  private: Callback_M callbacks;
336  private: std::map<std::string, std::list<std::string> > incomingMsgs;
337 
339  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
340 
341  private: boost::mutex publisherMutex;
342  private: boost::mutex publisherDeleteMutex;
343  private: boost::recursive_mutex incomingMutex;
344 
347  private: boost::recursive_mutex processIncomingMutex;
348 
349  private: bool initialized;
350  };
352  }
353 }
354 #endif