All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <boost/enable_shared_from_this.hpp>
22 #include <map>
23 #include <list>
24 #include <string>
25 #include <vector>
26 
29 
30 namespace gazebo
31 {
32  namespace transport
33  {
36 
40  class Node : public boost::enable_shared_from_this<Node>
41  {
43  public: Node();
44 
46  public: virtual ~Node();
47 
52  public: void Init(const std::string &_space ="");
53 
55  public: void Fini();
56 
59  public: std::string GetTopicNamespace() const;
60 
64  public: std::string DecodeTopicName(const std::string &_topic);
65 
69  public: std::string EncodeTopicName(const std::string &_topic);
70 
73  public: unsigned int GetId() const;
74 
77  public: void ProcessPublishers();
78 
80  public: void ProcessIncoming();
81 
85  public: bool HasLatchedSubscriber(const std::string &_topic) const;
86 
94  public: template<typename M>
95  transport::PublisherPtr Advertise(const std::string &_topic,
96  unsigned int _queueLimit = 1000,
97  double _hzRate = 0)
98  {
99  std::string decodedTopic = this->DecodeTopicName(_topic);
100  PublisherPtr publisher =
101  transport::TopicManager::Instance()->Advertise<M>(
102  decodedTopic, _queueLimit, _hzRate);
103 
104  boost::recursive_mutex::scoped_lock lock(this->publisherMutex);
105  this->publishers.push_back(publisher);
106  this->publishersEnd = this->publishers.end();
107 
108  return publisher;
109  }
110 
118  public: template<typename M, typename T>
119  SubscriberPtr Subscribe(const std::string &_topic,
120  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
121  bool _latching = false)
122  {
123  SubscribeOptions ops;
124  std::string decodedTopic = this->DecodeTopicName(_topic);
125  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
126 
127  {
128  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
129  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
130  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
131  }
132 
133  SubscriberPtr result =
134  transport::TopicManager::Instance()->Subscribe(ops);
135 
136  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
137 
138  return result;
139  }
140 
147  public: template<typename M>
148  SubscriberPtr Subscribe(const std::string &_topic,
149  void(*_fp)(const boost::shared_ptr<M const> &),
150  bool _latching = false)
151  {
152  SubscribeOptions ops;
153  std::string decodedTopic = this->DecodeTopicName(_topic);
154  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
155 
156  {
157  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
158  this->callbacks[decodedTopic].push_back(
159  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
160  }
161 
162  SubscriberPtr result =
163  transport::TopicManager::Instance()->Subscribe(ops);
164 
165  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
166 
167  return result;
168  }
169 
177  template<typename T>
178  SubscriberPtr Subscribe(const std::string &_topic,
179  void(T::*_fp)(const std::string &), T *_obj,
180  bool _latching = false)
181  {
182  SubscribeOptions ops;
183  std::string decodedTopic = this->DecodeTopicName(_topic);
184  ops.Init(decodedTopic, shared_from_this(), _latching);
185 
186  {
187  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
188  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
189  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
190  }
191 
192  SubscriberPtr result =
193  transport::TopicManager::Instance()->Subscribe(ops);
194 
195  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
196 
197  return result;
198  }
199 
200 
207  SubscriberPtr Subscribe(const std::string &_topic,
208  void(*_fp)(const std::string &), bool _latching = false)
209  {
210  SubscribeOptions ops;
211  std::string decodedTopic = this->DecodeTopicName(_topic);
212  ops.Init(decodedTopic, shared_from_this(), _latching);
213 
214  {
215  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
216  this->callbacks[decodedTopic].push_back(
218  }
219 
220  SubscriberPtr result =
221  transport::TopicManager::Instance()->Subscribe(ops);
222 
223  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
224 
225  return result;
226  }
227 
232  public: bool HandleData(const std::string &_topic,
233  const std::string &_msg);
234 
241  public: void InsertLatchedMsg(const std::string &_topic,
242  const std::string &_msg);
243 
247  public: std::string GetMsgType(const std::string &_topic) const;
248 
254  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
255 
256  private: std::string topicNamespace;
257  private: std::vector<PublisherPtr> publishers;
258  private: std::vector<PublisherPtr>::iterator publishersIter;
259  private: std::vector<PublisherPtr>::iterator publishersEnd;
260  private: static unsigned int idCounter;
261  private: unsigned int id;
262 
263  private: typedef std::list<CallbackHelperPtr> Callback_L;
264  private: typedef std::map<std::string, Callback_L> Callback_M;
265  private: Callback_M callbacks;
266  private: std::map<std::string, std::list<std::string> > incomingMsgs;
267  private: boost::recursive_mutex publisherMutex;
268  private: boost::recursive_mutex incomingMutex;
269 
270  private: bool initialized;
271  };
273  }
274 }
275 #endif