TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2015 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _TOPICMANAGER_HH_
18 #define _TOPICMANAGER_HH_
19 
20 #include <boost/bind.hpp>
21 #include <map>
22 #include <list>
23 #include <string>
24 #include <vector>
25 #include <boost/unordered/unordered_set.hpp>
26 
27 #include "gazebo/common/Assert.hh"
29 #include "gazebo/msgs/msgs.hh"
31 
40 #include "gazebo/util/system.hh"
41 
42 namespace gazebo
43 {
44  namespace transport
45  {
48 
51  class GAZEBO_VISIBLE TopicManager : public SingletonT<TopicManager>
52  {
53  private: TopicManager();
54  private: virtual ~TopicManager();
55 
57  public: void Init();
58 
60  public: void Fini();
61 
65  public: PublicationPtr FindPublication(const std::string &_topic);
66 
69  public: void AddNode(NodePtr _node);
70 
73  public: void RemoveNode(unsigned int _id);
74 
78  public: void ProcessNodes(bool _onlyOut = false);
79 
83  public: bool IsAdvertised(const std::string &_topic);
84 
88  public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
89 
94  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
95 
103  public: template<typename M>
104  PublisherPtr Advertise(const std::string &_topic,
105  unsigned int _queueLimit,
106  double _hzRate)
107  {
108  google::protobuf::Message *msg = NULL;
109  M msgtype;
110  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
111  if (!msg)
112  gzthrow("Advertise requires a google protobuf type");
113 
114  this->UpdatePublications(_topic, msg->GetTypeName());
115 
116  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
117  msg->GetTypeName(), _queueLimit, _hzRate));
118 
119  std::string msgTypename;
120  PublicationPtr publication;
121 
122  // Connect all local subscription to the publisher
123  msgTypename = msg->GetTypeName();
124 
125  publication = this->FindPublication(_topic);
126  GZ_ASSERT(publication != NULL, "FindPublication returned NULL");
127 
128  publication->AddPublisher(pub);
129  if (!publication->GetLocallyAdvertised())
130  {
131  ConnectionManager::Instance()->Advertise(_topic, msgTypename);
132  }
133 
134  publication->SetLocallyAdvertised(true);
135  pub->SetPublication(publication);
136 
137 
138  SubNodeMap::iterator iter2;
139  SubNodeMap::iterator stEnd2 = this->subscribedNodes.end();
140  for (iter2 = this->subscribedNodes.begin();
141  iter2 != stEnd2; ++iter2)
142  {
143  if (iter2->first == _topic)
144  {
145  std::list<NodePtr>::iterator liter;
146  std::list<NodePtr>::iterator lEnd = iter2->second.end();
147  for (liter = iter2->second.begin();
148  liter != lEnd; ++liter)
149  {
150  publication->AddSubscription(*liter);
151  }
152  }
153  }
154 
155  return pub;
156  }
157 
160  public: void Unadvertise(const std::string &_topic);
161 
164  public: void Unadvertise(PublisherPtr _pub);
165 
172  public: void Publish(const std::string &_topic, MessagePtr _message,
173  boost::function<void(uint32_t)> _cb, uint32_t _id);
174 
178  public: void ConnectPubToSub(const std::string &_topic,
179  const SubscriptionTransportPtr _sublink);
180 
183  public: void ConnectSubToPub(const msgs::Publish &_pub);
184 
189  public: void DisconnectPubFromSub(const std::string &_topic,
190  const std::string &_host,
191  unsigned int _port);
192 
197  public: void DisconnectSubFromPub(const std::string &_topic,
198  const std::string &_host,
199  unsigned int _port);
200 
203  public: void ConnectSubscribers(const std::string &_topic);
204 
210  public: PublicationPtr UpdatePublications(const std::string &_topic,
211  const std::string &_msgType);
212 
215  public: void RegisterTopicNamespace(const std::string &_name);
216 
219  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
220 
222  public: void ClearBuffers();
223 
226  public: void PauseIncoming(bool _pause);
227 
230  public: void AddNodeToProcess(NodePtr _ptr);
231 
233  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
234 
235  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
236  private: PublicationPtr_M advertisedTopics;
237  private: PublicationPtr_M::iterator advertisedTopicsEnd;
238  private: SubNodeMap subscribedNodes;
239  private: std::vector<NodePtr> nodes;
240 
242  private: boost::unordered_set<NodePtr> nodesToProcess;
243 
244  private: boost::recursive_mutex nodeMutex;
245 
247  private: boost::mutex subscriberMutex;
248 
250  private: boost::mutex processNodesMutex;
251 
252  private: bool pauseIncoming;
253 
254  // Singleton implementation
255  private: friend class SingletonT<TopicManager>;
256  };
258  }
259 }
260 #endif
boost::shared_ptr< SubscriptionTransport > SubscriptionTransportPtr
Definition: TransportTypes.hh:69
Options for a subscription.
Definition: SubscribeOptions.hh:35
static ConnectionManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
#define GZ_ASSERT(_expr, _msg)
This macro define the standard way of launching an exception inside gazebo.
Definition: Assert.hh:24
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
#define gzthrow(msg)
This macro logs an error to the throw stream and throws an exception that contains the file name and ...
Definition: Exception.hh:39
Forward declarations for transport.
Singleton template class.
Definition: SingletonT.hh:33
PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition: TopicManager.hh:104
Manages topics and their subscriptions.
Definition: TopicManager.hh:51
A publisher of messages on a topic.
Definition: Publisher.hh:43
boost::shared_ptr< Publication > PublicationPtr
Definition: TransportTypes.hh:61
std::map< std::string, std::list< NodePtr > > SubNodeMap
A map of string->list of Node pointers.
Definition: TopicManager.hh:233
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
#define NULL
Definition: CommonTypes.hh:30
boost::shared_ptr< Node > NodePtr
Definition: TransportTypes.hh:57
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
#define GAZEBO_VISIBLE
Use to represent "symbol visible" if supported.
Definition: system.hh:48