All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _TOPICMANAGER_HH_
18 #define _TOPICMANAGER_HH_
19 
20 #include <boost/bind.hpp>
21 #include <map>
22 #include <list>
23 #include <string>
24 #include <vector>
25 
26 #include "common/Exception.hh"
27 #include "msgs/msgs.hh"
28 #include "common/SingletonT.hh"
29 
35 #include "transport/Publisher.hh"
36 #include "transport/Publication.hh"
37 #include "transport/Subscriber.hh"
38 
39 namespace gazebo
40 {
41  namespace transport
42  {
45 
48  class TopicManager : public SingletonT<TopicManager>
49  {
50  private: TopicManager();
51  private: virtual ~TopicManager();
52 
54  public: void Init();
55 
57  public: void Fini();
58 
62  public: PublicationPtr FindPublication(const std::string &_topic);
63 
66  public: void AddNode(NodePtr _node);
67 
70  public: void RemoveNode(unsigned int _id);
71 
75  public: void ProcessNodes(bool _onlyOut = false);
76 
80  public: bool IsAdvertised(const std::string &_topic);
81 
85  public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
86 
91  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
92 
98  public: template<typename M>
99  PublisherPtr Advertise(const std::string &_topic,
100  unsigned int _queueLimit)
101  {
102  google::protobuf::Message *msg = NULL;
103  M msgtype;
104  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
105  if (!msg)
106  gzthrow("Advertise requires a google protobuf type");
107 
108  this->UpdatePublications(_topic, msg->GetTypeName());
109 
110  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
111  msg->GetTypeName(), _queueLimit));
112 
113  std::string msgTypename;
114  PublicationPtr publication;
115 
116  // Connect all local subscription to the publisher
117  for (int i = 0; i < 2; i ++)
118  {
119  std::string t;
120  if (i == 0)
121  {
122  t = _topic;
123  msgTypename = msg->GetTypeName();
124  }
125  else
126  {
127  t = _topic + "/__dbg";
128  msgs::GzString tmp;
129  msgTypename = tmp.GetTypeName();
130  }
131 
132  publication = this->FindPublication(t);
133  publication->AddPublisher(pub);
134  if (!publication->GetLocallyAdvertised())
135  {
136  ConnectionManager::Instance()->Advertise(t, msgTypename);
137  }
138 
139  publication->SetLocallyAdvertised(true);
140  pub->SetPublication(publication, i);
141 
142  SubNodeMap::iterator iter2;
143  SubNodeMap::iterator st_end2 = this->subscribedNodes.end();
144  for (iter2 = this->subscribedNodes.begin();
145  iter2 != st_end2; iter2++)
146  {
147  if (iter2->first == t)
148  {
149  std::list<NodePtr>::iterator liter;
150  std::list<NodePtr>::iterator l_end = iter2->second.end();
151  for (liter = iter2->second.begin();
152  liter != l_end; liter++)
153  {
154  publication->AddSubscription(*liter);
155  }
156  }
157  }
158  }
159 
160  return pub;
161  }
162 
165  public: void Unadvertise(const std::string &_topic);
166 
172  public: void Publish(const std::string &_topic,
173  const google::protobuf::Message &_message,
174  const boost::function<void()> &_cb = NULL);
175 
179  public: void ConnectPubToSub(const std::string &_topic,
180  const SubscriptionTransportPtr &_sublink);
181 
184  public: void ConnectSubToPub(const msgs::Publish &_pub);
185 
190  public: void DisconnectPubFromSub(const std::string &_topic,
191  const std::string &_host,
192  unsigned int _port);
193 
198  public: void DisconnectSubFromPub(const std::string &_topic,
199  const std::string &_host,
200  unsigned int _port);
201 
204  public: void ConnectSubscribers(const std::string &_topic);
205 
211  public: PublicationPtr UpdatePublications(const std::string &_topic,
212  const std::string &_msgType);
213 
216  public: void RegisterTopicNamespace(const std::string &_name);
217 
220  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
221 
226  public: std::map<std::string, std::list<std::string> >
228 
230  public: void ClearBuffers();
231 
234  public: void PauseIncoming(bool _pause);
235 
237  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
238 
239  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
240  private: PublicationPtr_M advertisedTopics;
241  private: PublicationPtr_M::iterator advertisedTopicsEnd;
242  private: SubNodeMap subscribedNodes;
243  private: std::vector<NodePtr> nodes;
244 
245  private: boost::recursive_mutex *nodeMutex;
246 
247  private: bool pauseIncoming;
248 
249  // Singleton implementation
250  private: friend class SingletonT<TopicManager>;
251  };
252 
254  }
255 }
256 #endif