17 #ifndef _TOPICMANAGER_HH_
18 #define _TOPICMANAGER_HH_
20 #include <boost/bind.hpp>
25 #include <boost/unordered/unordered_set.hpp>
69 public:
void AddNode(
NodePtr _node);
73 public:
void RemoveNode(
unsigned int _id);
78 public:
void ProcessNodes(
bool _onlyOut =
false);
83 public:
bool IsAdvertised(
const std::string &_topic);
94 public:
void Unsubscribe(
const std::string &_topic,
const NodePtr &_sub);
103 public:
template<
typename M>
105 unsigned int _queueLimit,
108 google::protobuf::Message *msg =
NULL;
110 msg =
dynamic_cast<google::protobuf::Message *
>(&msgtype);
112 gzthrow(
"Advertise requires a google protobuf type");
114 this->UpdatePublications(_topic, msg->GetTypeName());
117 msg->GetTypeName(), _queueLimit, _hzRate));
119 std::string msgTypename;
123 msgTypename = msg->GetTypeName();
125 publication = this->FindPublication(_topic);
126 GZ_ASSERT(publication !=
NULL,
"FindPublication returned NULL");
128 publication->AddPublisher(pub);
129 if (!publication->GetLocallyAdvertised())
134 publication->SetLocallyAdvertised(
true);
135 pub->SetPublication(publication);
138 SubNodeMap::iterator iter2;
139 SubNodeMap::iterator stEnd2 = this->subscribedNodes.end();
140 for (iter2 = this->subscribedNodes.begin();
141 iter2 != stEnd2; ++iter2)
143 if (iter2->first == _topic)
145 std::list<NodePtr>::iterator liter;
146 std::list<NodePtr>::iterator lEnd = iter2->second.end();
147 for (liter = iter2->second.begin();
148 liter != lEnd; ++liter)
150 publication->AddSubscription(*liter);
160 public:
void Unadvertise(
const std::string &_topic);
172 public:
void Publish(
const std::string &_topic,
MessagePtr _message,
173 boost::function<
void(uint32_t)> _cb, uint32_t _id);
178 public:
void ConnectPubToSub(
const std::string &_topic,
183 public:
void ConnectSubToPub(
const msgs::Publish &_pub);
189 public:
void DisconnectPubFromSub(
const std::string &_topic,
190 const std::string &_host,
197 public:
void DisconnectSubFromPub(
const std::string &_topic,
198 const std::string &_host,
203 public:
void ConnectSubscribers(
const std::string &_topic);
210 public:
PublicationPtr UpdatePublications(
const std::string &_topic,
211 const std::string &_msgType);
215 public:
void RegisterTopicNamespace(
const std::string &_name);
219 public:
void GetTopicNamespaces(std::list<std::string> &_namespaces);
222 public:
void ClearBuffers();
226 public:
void PauseIncoming(
bool _pause);
230 public:
void AddNodeToProcess(
NodePtr _ptr);
233 typedef std::map<std::string, std::list<NodePtr> >
SubNodeMap;
235 private:
typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
236 private: PublicationPtr_M advertisedTopics;
237 private: PublicationPtr_M::iterator advertisedTopicsEnd;
239 private: std::vector<NodePtr> nodes;
242 private: boost::unordered_set<NodePtr> nodesToProcess;
244 private: boost::recursive_mutex nodeMutex;
247 private: boost::mutex subscriberMutex;
250 private: boost::mutex processNodesMutex;
252 private:
bool pauseIncoming;