TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef GAZEBO_TRANSPORT_TOPICMANAGER_HH_
18 #define GAZEBO_TRANSPORT_TOPICMANAGER_HH_
19 
20 #include <boost/bind.hpp>
21 #include <boost/function.hpp>
22 #include <map>
23 #include <list>
24 #include <string>
25 #include <vector>
26 #include <boost/unordered/unordered_set.hpp>
27 
28 #include "gazebo/common/Assert.hh"
30 #include "gazebo/msgs/msgs.hh"
32 
41 #include "gazebo/util/system.hh"
42 
44 GZ_SINGLETON_DECLARE(GZ_TRANSPORT_VISIBLE, gazebo, transport, TopicManager)
45 
46 namespace gazebo
47 {
48  namespace transport
49  {
52 
55  class GZ_TRANSPORT_VISIBLE TopicManager : public SingletonT<TopicManager>
56  {
57  private: TopicManager();
58  private: virtual ~TopicManager();
59 
61  public: void Init();
62 
64  public: void Fini();
65 
69  public: PublicationPtr FindPublication(const std::string &_topic);
70 
73  public: void AddNode(NodePtr _node);
74 
77  public: void RemoveNode(unsigned int _id);
78 
82  public: void ProcessNodes(bool _onlyOut = false);
83 
87  public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
88 
93  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
94 
102  public: PublisherPtr Advertise(const std::string &_topic,
103  const std::string &_msgTypeName,
104  unsigned int _queueLimit,
105  double _hzRate)
106  {
107  this->UpdatePublications(_topic, _msgTypeName);
108 
109  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
110  _msgTypeName, _queueLimit, _hzRate));
111 
112  // Connect all local subscription to the publisher
113  PublicationPtr publication = this->FindPublication(_topic);
114  GZ_ASSERT(publication != nullptr,
115  "FindPublication returned nullptr");
116 
117  publication->AddPublisher(pub);
118  if (!publication->GetLocallyAdvertised())
119  {
120  ConnectionManager::Instance()->Advertise(_topic,
121  _msgTypeName);
122  }
123 
124  publication->SetLocallyAdvertised(true);
125  pub->SetPublication(publication);
126 
127  for (auto &iter2 : this->subscribedNodes)
128  {
129  if (iter2.first == _topic)
130  {
131  for (const auto liter : iter2.second)
132  {
133  publication->AddSubscription(liter);
134  }
135  }
136  }
137 
138  return pub;
139  }
140 
148  public: template<typename M>
149  PublisherPtr Advertise(const std::string &_topic,
150  unsigned int _queueLimit,
151  double _hzRate)
152  {
153  google::protobuf::Message *msg = nullptr;
154  M msgtype;
155  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
156  if (!msg)
157  gzthrow("Advertise requires a google protobuf type");
158 
159  return this->Advertise(_topic, msg->GetTypeName(), _queueLimit,
160  _hzRate);
161  }
162 
165  public: void Unadvertise(const std::string &_topic);
166 
169  public: void Unadvertise(PublisherPtr _pub);
170 
175  public: void Unadvertise(const std::string &_topic, const uint32_t _id);
176 
183  public: void Publish(const std::string &_topic, MessagePtr _message,
184  boost::function<void(uint32_t)> _cb, uint32_t _id);
185 
189  public: void ConnectPubToSub(const std::string &_topic,
190  const SubscriptionTransportPtr _sublink);
191 
194  public: void ConnectSubToPub(const msgs::Publish &_pub);
195 
200  public: void DisconnectPubFromSub(const std::string &_topic,
201  const std::string &_host,
202  unsigned int _port);
203 
208  public: void DisconnectSubFromPub(const std::string &_topic,
209  const std::string &_host,
210  unsigned int _port);
211 
214  public: void ConnectSubscribers(const std::string &_topic);
215 
221  public: PublicationPtr UpdatePublications(const std::string &_topic,
222  const std::string &_msgType);
223 
226  public: void RegisterTopicNamespace(const std::string &_name);
227 
230  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
231 
233  public: void ClearBuffers();
234 
237  public: void PauseIncoming(bool _pause);
238 
241  public: void AddNodeToProcess(NodePtr _ptr);
242 
244  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
245 
246  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
247  private: PublicationPtr_M advertisedTopics;
248  private: PublicationPtr_M::iterator advertisedTopicsEnd;
249  private: SubNodeMap subscribedNodes;
250  private: std::vector<NodePtr> nodes;
251 
253  private: boost::unordered_set<NodePtr> nodesToProcess;
254 
255  private: boost::recursive_mutex nodeMutex;
256 
258  private: boost::mutex subscriberMutex;
259 
261  private: boost::mutex processNodesMutex;
262 
263  private: bool pauseIncoming;
264 
265  // Singleton implementation
266  private: friend class SingletonT<TopicManager>;
267  };
269  }
270 }
271 #endif
Options for a subscription.
Definition: SubscribeOptions.hh:35
#define GZ_ASSERT(_expr, _msg)
This macro define the standard way of launching an exception inside gazebo.
Definition: Assert.hh:24
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
#define gzthrow(msg)
This macro logs an error to the throw stream and throws an exception that contains the file name and ...
Definition: Exception.hh:39
Forward declarations for transport.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
Singleton template class.
Definition: SingletonT.hh:33
boost::shared_ptr< Node > NodePtr
Definition: TransportTypes.hh:57
boost::shared_ptr< Publication > PublicationPtr
Definition: TransportTypes.hh:61
boost::shared_ptr< SubscriptionTransport > SubscriptionTransportPtr
Definition: TransportTypes.hh:69
Manages topics and their subscriptions.
Definition: TopicManager.hh:55
A publisher of messages on a topic.
Definition: Publisher.hh:44
PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition: TopicManager.hh:102
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
transport
Definition: TopicManager.hh:44
PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition: TopicManager.hh:149
std::map< std::string, std::list< NodePtr > > SubNodeMap
A map of string->list of Node pointers.
Definition: TopicManager.hh:244
#define GZ_SINGLETON_DECLARE(visibility, n1, n2, singletonType)
Helper to declare typed SingletonT.
Definition: SingletonT.hh:61
static ConnectionManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36