All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2011 Nate Koenig
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef TOPICMANAGER_HH
18 #define TOPICMANAGER_HH
19 
20 #include <boost/bind.hpp>
21 #include <map>
22 #include <list>
23 #include <string>
24 #include <vector>
25 
26 #include "common/Exception.hh"
27 #include "msgs/msgs.hh"
28 #include "common/SingletonT.hh"
29 
35 #include "transport/Publisher.hh"
36 #include "transport/Publication.hh"
37 #include "transport/Subscriber.hh"
38 
39 namespace gazebo
40 {
41  namespace transport
42  {
45 
47  class TopicManager : public SingletonT<TopicManager>
48  {
49  private: TopicManager();
50  private: virtual ~TopicManager();
51 
52  public: void Init();
53 
54  public: void Fini();
55 
56  public: PublicationPtr FindPublication(const std::string &topic);
57 
58  public: void AddNode(NodePtr _node);
59 
60  public: void RemoveNode(unsigned int _id);
61 
64  public: void ProcessNodes(bool _onlyOut = false);
65 
69  public: bool IsAdvertised(const std::string &_topic);
70 
72  public: SubscriberPtr Subscribe(const SubscribeOptions &options);
73 
76  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
77 
80  public: template<typename M>
81  PublisherPtr Advertise(const std::string &_topic,
82  unsigned int _queueLimit,
83  bool _latch)
84  {
85  google::protobuf::Message *msg = NULL;
86  M msgtype;
87  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
88  if (!msg)
89  gzthrow("Advertise requires a google protobuf type");
90 
91  this->UpdatePublications(_topic, msg->GetTypeName());
92 
93  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
94  msg->GetTypeName(), _queueLimit, _latch));
95 
96  std::string msgTypename;
97  PublicationPtr publication;
98 
99  // Connect all local subscription to the publisher
100  for (int i = 0; i < 2; i ++)
101  {
102  std::string t;
103  if (i == 0)
104  {
105  t = _topic;
106  msgTypename = msg->GetTypeName();
107  }
108  else
109  {
110  t = _topic + "/__dbg";
111  msgs::GzString tmp;
112  msgTypename = tmp.GetTypeName();
113  }
114 
115  publication = this->FindPublication(t);
116  publication->AddPublisher(pub);
117  if (!publication->GetLocallyAdvertised())
118  {
119  ConnectionManager::Instance()->Advertise(t, msgTypename);
120  }
121 
122  publication->SetLocallyAdvertised(true);
123  pub->SetPublication(publication, i);
124 
125  SubNodeMap::iterator iter2;
126  SubNodeMap::iterator st_end2 = this->subscribedNodes.end();
127  for (iter2 = this->subscribedNodes.begin();
128  iter2 != st_end2; iter2++)
129  {
130  if (iter2->first == t)
131  {
132  std::list<NodePtr>::iterator liter;
133  std::list<NodePtr>::iterator l_end = iter2->second.end();
134  for (liter = iter2->second.begin();
135  liter != l_end; liter++)
136  {
137  publication->AddSubscription(*liter);
138  }
139  }
140  }
141  }
142 
143  return pub;
144  }
145 
147  public: void Unadvertise(const std::string &topic);
148 
154  public: void Publish(const std::string &topic,
155  const google::protobuf::Message &message,
156  const boost::function<void()> &cb = NULL);
157 
159  public: void ConnectPubToSub(const std::string &topic,
160  const SubscriptionTransportPtr &sublink);
161 
163  public: void ConnectSubToPub(const msgs::Publish &_pub);
164 
166  public: void DisconnectPubFromSub(const std::string &topic,
167  const std::string &host,
168  unsigned int port);
169 
171  public: void DisconnectSubFromPub(const std::string &topic,
172  const std::string &host,
173  unsigned int port);
174 
176  public: void ConnectSubscribers(const std::string &topic);
177 
180  public: PublicationPtr UpdatePublications(const std::string &topic,
181  const std::string &msgType);
182 
184  public: void RegisterTopicNamespace(const std::string &_name);
185 
187  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
188 
189  public: void ClearBuffers();
190 
191  public: void PauseIncoming(bool _pause);
192 
193  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
194 
195  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
196  private: PublicationPtr_M advertisedTopics;
197  private: PublicationPtr_M::iterator advertisedTopicsEnd;
198  private: SubNodeMap subscribedNodes;
199  private: std::vector<NodePtr> nodes;
200 
201  private: boost::recursive_mutex *nodeMutex;
202 
203  private: bool pauseIncoming;
204 
205  // Singleton implementation
206  private: friend class SingletonT<TopicManager>;
207  };
209  }
210 }
211 #endif
212