All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <boost/enable_shared_from_this.hpp>
22 #include <map>
23 #include <list>
24 #include <string>
25 #include <vector>
26 
29 
30 namespace gazebo
31 {
32  namespace transport
33  {
36 
40  class Node : public boost::enable_shared_from_this<Node>
41  {
43  public: Node();
44 
46  public: virtual ~Node();
47 
52  public: void Init(const std::string &_space ="");
53 
55  public: void Fini();
56 
59  public: std::string GetTopicNamespace() const;
60 
64  public: std::string DecodeTopicName(const std::string &_topic);
65 
69  public: std::string EncodeTopicName(const std::string &_topic);
70 
73  public: unsigned int GetId() const;
74 
77  public: void ProcessPublishers();
78 
80  public: void ProcessIncoming();
81 
85  public: bool HasLatchedSubscriber(const std::string &_topic) const;
86 
92  public: template<typename M>
93  transport::PublisherPtr Advertise(const std::string &_topic,
94  unsigned int _queueLimit = 1000)
95  {
96  std::string decodedTopic = this->DecodeTopicName(_topic);
97  PublisherPtr publisher =
98  transport::TopicManager::Instance()->Advertise<M>(
99  decodedTopic, _queueLimit);
100 
101  boost::recursive_mutex::scoped_lock lock(this->publisherMutex);
102  this->publishers.push_back(publisher);
103  this->publishersEnd = this->publishers.end();
104 
105  return publisher;
106  }
107 
115  public: template<typename M, typename T>
116  SubscriberPtr Subscribe(const std::string &_topic,
117  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
118  bool _latching = false)
119  {
120  SubscribeOptions ops;
121  std::string decodedTopic = this->DecodeTopicName(_topic);
122  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
123 
124  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
125  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
126  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
127 
128  SubscriberPtr result =
129  transport::TopicManager::Instance()->Subscribe(ops);
130 
131  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
132 
133  return result;
134  }
135 
142  public: template<typename M>
143  SubscriberPtr Subscribe(const std::string &_topic,
144  void(*_fp)(const boost::shared_ptr<M const> &),
145  bool _latching = false)
146  {
147  SubscribeOptions ops;
148  std::string decodedTopic = this->DecodeTopicName(_topic);
149  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
150 
151  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
152  this->callbacks[decodedTopic].push_back(
153  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
154 
155  SubscriberPtr result =
156  transport::TopicManager::Instance()->Subscribe(ops);
157 
158  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
159 
160  return result;
161  }
162 
170  template<typename T>
171  SubscriberPtr Subscribe(const std::string &_topic,
172  void(T::*_fp)(const std::string &), T *_obj,
173  bool _latching = false)
174  {
175  SubscribeOptions ops;
176  std::string decodedTopic = this->DecodeTopicName(_topic);
177  ops.Init(decodedTopic, shared_from_this(), _latching);
178 
179  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
180  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
181  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
182 
183  SubscriberPtr result =
184  transport::TopicManager::Instance()->Subscribe(ops);
185 
186  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
187 
188  return result;
189  }
190 
191 
198  SubscriberPtr Subscribe(const std::string &_topic,
199  void(*_fp)(const std::string &), bool _latching = false)
200  {
201  SubscribeOptions ops;
202  std::string decodedTopic = this->DecodeTopicName(_topic);
203  ops.Init(decodedTopic, shared_from_this(), _latching);
204 
205  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
206  this->callbacks[decodedTopic].push_back(
208 
209  SubscriberPtr result =
210  transport::TopicManager::Instance()->Subscribe(ops);
211 
212  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
213 
214  return result;
215  }
216 
221  public: bool HandleData(const std::string &_topic,
222  const std::string &_msg);
223 
230  public: void InsertLatchedMsg(const std::string &_topic,
231  const std::string &_msg);
232 
236  public: std::string GetMsgType(const std::string &_topic) const;
237 
243  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
244 
245  private: std::string topicNamespace;
246  private: std::vector<PublisherPtr> publishers;
247  private: std::vector<PublisherPtr>::iterator publishersIter;
248  private: std::vector<PublisherPtr>::iterator publishersEnd;
249  private: static unsigned int idCounter;
250  private: unsigned int id;
251 
252  private: typedef std::list<CallbackHelperPtr> Callback_L;
253  private: typedef std::map<std::string, Callback_L> Callback_M;
254  private: Callback_M callbacks;
255  private: std::map<std::string, std::list<std::string> > incomingMsgs;
256  private: boost::recursive_mutex publisherMutex;
257  private: boost::recursive_mutex incomingMutex;
258 
259  private: bool initialized;
260  };
262  }
263 }
264 #endif