All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/enable_shared_from_this.hpp>
23 #include <map>
24 #include <list>
25 #include <string>
26 #include <vector>
27 
30 
31 namespace gazebo
32 {
33  namespace transport
34  {
37  class PublishTask : public tbb::task
38  {
42  public: PublishTask(transport::PublisherPtr _pub,
43  const google::protobuf::Message &_message)
44  : pub(_pub)
45  {
46  this->msg = _message.New();
47  this->msg->CopyFrom(_message);
48  }
49 
52  public: tbb::task *execute()
53  {
54  this->pub->WaitForConnection();
55  this->pub->Publish(*this->msg, true);
56  this->pub->SendMessage();
57  delete this->msg;
58  this->pub.reset();
59  return NULL;
60  }
61 
63  private: transport::PublisherPtr pub;
64 
66  private: google::protobuf::Message *msg;
67  };
69 
72 
76  class Node : public boost::enable_shared_from_this<Node>
77  {
79  public: Node();
80 
82  public: virtual ~Node();
83 
88  public: void Init(const std::string &_space ="");
89 
91  public: void Fini();
92 
95  public: std::string GetTopicNamespace() const;
96 
100  public: std::string DecodeTopicName(const std::string &_topic);
101 
105  public: std::string EncodeTopicName(const std::string &_topic);
106 
109  public: unsigned int GetId() const;
110 
113  public: void ProcessPublishers();
114 
116  public: void ProcessIncoming();
117 
121  public: bool HasLatchedSubscriber(const std::string &_topic) const;
122 
123 
130  public: template<typename M>
131  void Publish(const std::string &_topic,
132  const google::protobuf::Message &_message)
133  {
134  transport::PublisherPtr pub = this->Advertise<M>(_topic);
135  PublishTask *task = new(tbb::task::allocate_root())
136  PublishTask(pub, _message);
137 
138  tbb::task::enqueue(*task);
139  return;
140  }
141 
149  public: template<typename M>
150  transport::PublisherPtr Advertise(const std::string &_topic,
151  unsigned int _queueLimit = 1000,
152  double _hzRate = 0)
153  {
154  std::string decodedTopic = this->DecodeTopicName(_topic);
155  PublisherPtr publisher =
156  transport::TopicManager::Instance()->Advertise<M>(
157  decodedTopic, _queueLimit, _hzRate);
158 
159  boost::mutex::scoped_lock lock(this->publisherMutex);
160  publisher->SetNode(shared_from_this());
161  this->publishers.push_back(publisher);
162 
163  return publisher;
164  }
165 
173  public: template<typename M, typename T>
174  SubscriberPtr Subscribe(const std::string &_topic,
175  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
176  bool _latching = false)
177  {
178  SubscribeOptions ops;
179  std::string decodedTopic = this->DecodeTopicName(_topic);
180  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
181 
182  {
183  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
184  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
185  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
186  }
187 
188  SubscriberPtr result =
189  transport::TopicManager::Instance()->Subscribe(ops);
190 
191  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
192 
193  return result;
194  }
195 
202  public: template<typename M>
203  SubscriberPtr Subscribe(const std::string &_topic,
204  void(*_fp)(const boost::shared_ptr<M const> &),
205  bool _latching = false)
206  {
207  SubscribeOptions ops;
208  std::string decodedTopic = this->DecodeTopicName(_topic);
209  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
210 
211  {
212  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
213  this->callbacks[decodedTopic].push_back(
214  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
215  }
216 
217  SubscriberPtr result =
218  transport::TopicManager::Instance()->Subscribe(ops);
219 
220  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
221 
222  return result;
223  }
224 
232  template<typename T>
233  SubscriberPtr Subscribe(const std::string &_topic,
234  void(T::*_fp)(const std::string &), T *_obj,
235  bool _latching = false)
236  {
237  SubscribeOptions ops;
238  std::string decodedTopic = this->DecodeTopicName(_topic);
239  ops.Init(decodedTopic, shared_from_this(), _latching);
240 
241  {
242  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
243  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
244  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
245  }
246 
247  SubscriberPtr result =
248  transport::TopicManager::Instance()->Subscribe(ops);
249 
250  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
251 
252  return result;
253  }
254 
255 
262  SubscriberPtr Subscribe(const std::string &_topic,
263  void(*_fp)(const std::string &), bool _latching = false)
264  {
265  SubscribeOptions ops;
266  std::string decodedTopic = this->DecodeTopicName(_topic);
267  ops.Init(decodedTopic, shared_from_this(), _latching);
268 
269  {
270  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
271  this->callbacks[decodedTopic].push_back(
273  }
274 
275  SubscriberPtr result =
276  transport::TopicManager::Instance()->Subscribe(ops);
277 
278  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
279 
280  return result;
281  }
282 
287  public: bool HandleData(const std::string &_topic,
288  const std::string &_msg);
289 
294  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
295 
302  public: void InsertLatchedMsg(const std::string &_topic,
303  const std::string &_msg);
304 
311  public: void InsertLatchedMsg(const std::string &_topic,
312  MessagePtr _msg);
313 
317  public: std::string GetMsgType(const std::string &_topic) const;
318 
324  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
325 
326  private: std::string topicNamespace;
327  private: std::vector<PublisherPtr> publishers;
328  private: std::vector<PublisherPtr>::iterator publishersIter;
329  private: static unsigned int idCounter;
330  private: unsigned int id;
331 
332  private: typedef std::list<CallbackHelperPtr> Callback_L;
333  private: typedef std::map<std::string, Callback_L> Callback_M;
334  private: Callback_M callbacks;
335  private: std::map<std::string, std::list<std::string> > incomingMsgs;
336 
338  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
339 
340  private: boost::mutex publisherMutex;
341  private: boost::mutex publisherDeleteMutex;
342  private: boost::recursive_mutex incomingMutex;
343 
346  private: boost::recursive_mutex processIncomingMutex;
347 
348  private: bool initialized;
349  };
351  }
352 }
353 #endif