All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _TOPICMANAGER_HH_
18 #define _TOPICMANAGER_HH_
19 
20 #include <boost/bind.hpp>
21 #include <map>
22 #include <list>
23 #include <string>
24 #include <vector>
25 #include <boost/unordered/unordered_set.hpp>
26 
27 #include "gazebo/common/Assert.hh"
29 #include "gazebo/msgs/msgs.hh"
31 
40 
41 namespace gazebo
42 {
43  namespace transport
44  {
47 
50  class TopicManager : public SingletonT<TopicManager>
51  {
52  private: TopicManager();
53  private: virtual ~TopicManager();
54 
56  public: void Init();
57 
59  public: void Fini();
60 
64  public: PublicationPtr FindPublication(const std::string &_topic);
65 
68  public: void AddNode(NodePtr _node);
69 
72  public: void RemoveNode(unsigned int _id);
73 
77  public: void ProcessNodes(bool _onlyOut = false);
78 
82  public: bool IsAdvertised(const std::string &_topic);
83 
87  public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
88 
93  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
94 
102  public: template<typename M>
103  PublisherPtr Advertise(const std::string &_topic,
104  unsigned int _queueLimit,
105  double _hzRate)
106  {
107  google::protobuf::Message *msg = NULL;
108  M msgtype;
109  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
110  if (!msg)
111  gzthrow("Advertise requires a google protobuf type");
112 
113  this->UpdatePublications(_topic, msg->GetTypeName());
114 
115  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
116  msg->GetTypeName(), _queueLimit, _hzRate));
117 
118  std::string msgTypename;
119  PublicationPtr publication;
120 
121  // Connect all local subscription to the publisher
122  msgTypename = msg->GetTypeName();
123 
124  publication = this->FindPublication(_topic);
125  GZ_ASSERT(publication != NULL, "FindPublication returned NULL");
126 
127  publication->AddPublisher(pub);
128  if (!publication->GetLocallyAdvertised())
129  {
130  ConnectionManager::Instance()->Advertise(_topic, msgTypename);
131  }
132 
133  publication->SetLocallyAdvertised(true);
134  pub->SetPublication(publication);
135 
136 
137  SubNodeMap::iterator iter2;
138  SubNodeMap::iterator stEnd2 = this->subscribedNodes.end();
139  for (iter2 = this->subscribedNodes.begin();
140  iter2 != stEnd2; ++iter2)
141  {
142  if (iter2->first == _topic)
143  {
144  std::list<NodePtr>::iterator liter;
145  std::list<NodePtr>::iterator lEnd = iter2->second.end();
146  for (liter = iter2->second.begin();
147  liter != lEnd; ++liter)
148  {
149  publication->AddSubscription(*liter);
150  }
151  }
152  }
153 
154  return pub;
155  }
156 
159  public: void Unadvertise(const std::string &_topic);
160 
167  public: void Publish(const std::string &_topic, MessagePtr _message,
168  boost::function<void(uint32_t)> _cb, uint32_t _id);
169 
173  public: void ConnectPubToSub(const std::string &_topic,
174  const SubscriptionTransportPtr _sublink);
175 
178  public: void ConnectSubToPub(const msgs::Publish &_pub);
179 
184  public: void DisconnectPubFromSub(const std::string &_topic,
185  const std::string &_host,
186  unsigned int _port);
187 
192  public: void DisconnectSubFromPub(const std::string &_topic,
193  const std::string &_host,
194  unsigned int _port);
195 
198  public: void ConnectSubscribers(const std::string &_topic);
199 
205  public: PublicationPtr UpdatePublications(const std::string &_topic,
206  const std::string &_msgType);
207 
210  public: void RegisterTopicNamespace(const std::string &_name);
211 
214  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
215 
217  public: void ClearBuffers();
218 
221  public: void PauseIncoming(bool _pause);
222 
225  public: void AddNodeToProcess(NodePtr _ptr);
226 
228  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
229 
230  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
231  private: PublicationPtr_M advertisedTopics;
232  private: PublicationPtr_M::iterator advertisedTopicsEnd;
233  private: SubNodeMap subscribedNodes;
234  private: std::vector<NodePtr> nodes;
235 
237  private: boost::unordered_set<NodePtr> nodesToProcess;
238 
239  private: boost::recursive_mutex nodeMutex;
240 
242  private: boost::mutex subscriberMutex;
243 
245  private: boost::mutex processNodesMutex;
246 
247  private: bool pauseIncoming;
248 
249  // Singleton implementation
250  private: friend class SingletonT<TopicManager>;
251  };
253  }
254 }
255 #endif