All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <tbb/task.h>
21 #include <google/protobuf/message.h>
22 
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
28 
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <iomanip>
33 #include <deque>
34 #include <utility>
35 
36 #include "gazebo/common/Event.hh"
37 #include "gazebo/common/Console.hh"
39 
40 #define HEADER_LENGTH 8
41 
42 namespace gazebo
43 {
44  namespace transport
45  {
46  extern bool is_stopped();
47 
48  class IOManager;
49  class Connection;
50  typedef boost::shared_ptr<Connection> ConnectionPtr;
51 
55  class ConnectionReadTask : public tbb::task
56  {
62  boost::function<void (const std::string &)> _func,
63  const std::string &_data)
64  {
65  this->func = _func;
66  this->data = _data;
67  }
68 
71  public: tbb::task *execute()
72  {
73  this->func(this->data);
74  return NULL;
75  }
76 
78  private: boost::function<void (const std::string &)> func;
79 
81  private: std::string data;
82  };
84 
99  class Connection : public boost::enable_shared_from_this<Connection>
100  {
102  public: Connection();
103 
105  public: virtual ~Connection();
106 
111  public: bool Connect(const std::string &_host, unsigned int _port);
112 
114  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
115 
120  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
121 
123  typedef boost::function<void(const std::string &_data)> ReadCallback;
124 
128  public: void StartRead(const ReadCallback &_cb);
129 
131  public: void StopRead();
132 
134  public: void Shutdown();
135 
138  public: bool IsOpen() const;
139 
141  private: void Close();
142 
144  public: void Cancel();
145 
149  public: bool Read(std::string &_data);
150 
158  public: void EnqueueMsg(const std::string &_buffer,
159  boost::function<void(uint32_t)> _cb, uint32_t _id,
160  bool _force = false);
161 
166  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
167 
170  public: std::string GetLocalURI() const;
171 
174  public: std::string GetRemoteURI() const;
175 
178  public: std::string GetLocalAddress() const;
179 
182  public: unsigned int GetLocalPort() const;
183 
186  public: std::string GetRemoteAddress() const;
187 
190  public: unsigned int GetRemotePort() const;
191 
194  public: std::string GetRemoteHostname() const;
195 
198  public: static std::string GetLocalHostname();
199 
202  public: template<typename Handler>
203  void AsyncRead(Handler _handler)
204  {
205  if (!this->IsOpen())
206  {
207  gzerr << "AsyncRead on a closed socket\n";
208  return;
209  }
210 
211  void (Connection::*f)(const boost::system::error_code &,
212  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
213 
214  this->inboundHeader.resize(HEADER_LENGTH);
215  boost::asio::async_read(*this->socket,
216  boost::asio::buffer(this->inboundHeader),
217  boost::bind(f, this,
218  boost::asio::placeholders::error,
219  boost::make_tuple(_handler)));
220  }
221 
229  private: template<typename Handler>
230  void OnReadHeader(const boost::system::error_code &_e,
231  boost::tuple<Handler> _handler)
232  {
233  if (_e)
234  {
235  if (_e.message() == "End of file")
236  this->isOpen = false;
237  }
238  else
239  {
240  std::size_t inboundData_size = 0;
241  std::string header(&this->inboundHeader[0],
242  this->inboundHeader.size());
243  this->inboundHeader.clear();
244 
245  inboundData_size = this->ParseHeader(header);
246 
247  if (inboundData_size > 0)
248  {
249  // Start the asynchronous call to receive data
250  this->inboundData.resize(inboundData_size);
251 
252  void (Connection::*f)(const boost::system::error_code &e,
253  boost::tuple<Handler>) =
254  &Connection::OnReadData<Handler>;
255 
256  boost::asio::async_read(*this->socket,
257  boost::asio::buffer(this->inboundData),
258  boost::bind(f, this,
259  boost::asio::placeholders::error,
260  _handler));
261  }
262  else
263  {
264  gzerr << "Header is empty\n";
265  boost::get<0>(_handler)("");
266  // This code tries to read the header again. We should
267  // never get here.
268  // this->inboundHeader.resize(HEADER_LENGTH);
269 
270  // void (Connection::*f)(const boost::system::error_code &,
271  // boost::tuple<Handler>) =
272  // &Connection::OnReadHeader<Handler>;
273 
274  // boost::asio::async_read(*this->socket,
275  // boost::asio::buffer(this->inboundHeader),
276  // boost::bind(f, this,
277  // boost::asio::placeholders::error, _handler));
278  }
279  }
280  }
281 
289  private: template<typename Handler>
290  void OnReadData(const boost::system::error_code &_e,
291  boost::tuple<Handler> _handler)
292  {
293  if (_e)
294  {
295  if (_e.message() == "End of file")
296  this->isOpen = false;
297  }
298 
299  // Inform caller that data has been received
300  std::string data(&this->inboundData[0],
301  this->inboundData.size());
302  this->inboundData.clear();
303 
304  if (data.empty())
305  gzerr << "OnReadData got empty data!!!\n";
306 
307  if (!_e && !transport::is_stopped())
308  {
309  ConnectionReadTask *task = new(tbb::task::allocate_root())
310  ConnectionReadTask(boost::get<0>(_handler), data);
311  tbb::task::enqueue(*task);
312 
313  // Non-tbb version:
314  // boost::get<0>(_handler)(data);
315  }
316  }
317 
321  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
322  _subscriber)
323  { return this->shutdown.Connect(_subscriber); }
324 
328  public: void DisconnectShutdown(event::ConnectionPtr _subscriber)
329  {this->shutdown.Disconnect(_subscriber);}
330 
332  public: void ProcessWriteQueue(bool _blocking = false);
333 
336  public: unsigned int GetId() const;
337 
341  public: static bool ValidateIP(const std::string &_ip);
342 
346  public: std::string GetIPWhiteList() const;
347 
351  private: void OnWrite(const boost::system::error_code &_e);
352 
355  private: void OnAccept(const boost::system::error_code &_e);
356 
359  private: std::size_t ParseHeader(const std::string &_header);
360 
362  private: void ReadLoop(const ReadCallback &_cb);
363 
366  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
367 
370  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
371 
374  private: static std::string GetHostname(
375  boost::asio::ip::tcp::endpoint _ep);
376 
380  private: void OnConnect(const boost::system::error_code &_error,
381  boost::asio::ip::tcp::resolver::iterator _endPointIter);
382 
384  private: boost::asio::ip::tcp::socket *socket;
385 
387  private: boost::asio::ip::tcp::acceptor *acceptor;
388 
390  private: std::deque<std::string> writeQueue;
391 
394  private: std::deque<
395  std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
396 
398  private: boost::mutex connectMutex;
399 
401  private: boost::recursive_mutex writeMutex;
402 
404  private: boost::recursive_mutex readMutex;
405 
407  private: mutable boost::mutex socketMutex;
408 
410  private: boost::condition_variable connectCondition;
411 
413  private: AcceptCallback acceptCB;
414 
416  private: std::vector<char> inboundHeader;
417 
419  private: std::vector<char> inboundData;
420 
422  private: bool readQuit;
423 
425  private: unsigned int id;
426 
428  private: static unsigned int idCounter;
429 
431  private: ConnectionPtr acceptConn;
432 
434  private: event::EventT<void()> shutdown;
435 
437  private: static IOManager *iomanager;
438 
440  private: unsigned int writeCount;
441 
443  private: std::string localURI;
444 
446  private: std::string localAddress;
447 
449  private: std::string remoteURI;
450 
452  private: std::string remoteAddress;
453 
455  private: bool connectError;
456 
458  private: std::string ipWhiteList;
459 
461  private: char *headerBuffer;
462 
464  private: bool dropMsgLogged;
465 
468  private: unsigned int callbackIndex;
469 
471  private: bool isOpen;
472  };
474  }
475 }
476 #endif