Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <tbb/task.h>
21 #include <google/protobuf/message.h>
22 
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
28 
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <iomanip>
33 #include <deque>
34 #include <utility>
35 
36 #include "gazebo/common/Event.hh"
37 #include "gazebo/common/Console.hh"
40 #include "gazebo/util/system.hh"
41 
42 #define HEADER_LENGTH 8
43 
44 namespace gazebo
45 {
46  namespace transport
47  {
48  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
49 
50  class IOManager;
51  class Connection;
52  typedef boost::shared_ptr<Connection> ConnectionPtr;
53 
57  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
58  {
63  public: ConnectionReadTask(
64  boost::function<void (const std::string &)> _func,
65  const std::string &_data) :
66  func(_func),
67  data(_data)
68  {
69  }
70 
73  public: tbb::task *execute()
74  {
75  this->func(this->data);
76  return NULL;
77  }
78 
80  private: boost::function<void (const std::string &)> func;
81 
83  private: std::string data;
84  };
86 
101  class GZ_TRANSPORT_VISIBLE Connection :
102  public boost::enable_shared_from_this<Connection>
103  {
105  public: Connection();
106 
108  public: virtual ~Connection();
109 
114  public: bool Connect(const std::string &_host, unsigned int _port);
115 
117  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
118 
123  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
124 
126  typedef boost::function<void(const std::string &_data)> ReadCallback;
127 
131  public: void StartRead(const ReadCallback &_cb);
132 
134  public: void StopRead();
135 
137  public: void Shutdown();
138 
141  public: bool IsOpen() const;
142 
144  private: void Close();
145 
147  public: void Cancel();
148 
152  public: bool Read(std::string &_data);
153 
161  public: void EnqueueMsg(const std::string &_buffer,
162  boost::function<void(uint32_t)> _cb, uint32_t _id,
163  bool _force = false);
164 
169  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
170 
173  public: std::string GetLocalURI() const;
174 
177  public: std::string GetRemoteURI() const;
178 
181  public: std::string GetLocalAddress() const;
182 
185  public: unsigned int GetLocalPort() const;
186 
189  public: std::string GetRemoteAddress() const;
190 
193  public: unsigned int GetRemotePort() const;
194 
197  public: std::string GetRemoteHostname() const;
198 
201  public: static std::string GetLocalHostname();
202 
205  public: template<typename Handler>
206  void AsyncRead(Handler _handler)
207  {
208  boost::mutex::scoped_lock lock(this->socketMutex);
209  if (!this->IsOpen())
210  {
211  gzerr << "AsyncRead on a closed socket\n";
212  return;
213  }
214 
215  void (Connection::*f)(const boost::system::error_code &,
216  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
217 
218  this->inboundHeader.resize(HEADER_LENGTH);
219  boost::asio::async_read(*this->socket,
220  boost::asio::buffer(this->inboundHeader),
221  common::weakBind(f, this->shared_from_this(),
222  boost::asio::placeholders::error,
223  boost::make_tuple(_handler)));
224  }
225 
233  private: template<typename Handler>
234  void OnReadHeader(const boost::system::error_code &_e,
235  boost::tuple<Handler> _handler)
236  {
237  if (_e)
238  {
239  if (_e.value() == boost::asio::error::eof)
240  this->isOpen = false;
241  }
242  else
243  {
244  std::size_t inboundData_size = 0;
245  std::string header(&this->inboundHeader[0],
246  this->inboundHeader.size());
247  this->inboundHeader.clear();
248 
249  inboundData_size = this->ParseHeader(header);
250 
251  if (inboundData_size > 0)
252  {
253  // Start the asynchronous call to receive data
254  this->inboundData.resize(inboundData_size);
255 
256  void (Connection::*f)(const boost::system::error_code &e,
257  boost::tuple<Handler>) =
258  &Connection::OnReadData<Handler>;
259 
260  boost::asio::async_read(*this->socket,
261  boost::asio::buffer(this->inboundData),
262  common::weakBind(f, this->shared_from_this(),
263  boost::asio::placeholders::error,
264  _handler));
265  }
266  else
267  {
268  gzerr << "Header is empty\n";
269  boost::get<0>(_handler)("");
270  // This code tries to read the header again. We should
271  // never get here.
272  // this->inboundHeader.resize(HEADER_LENGTH);
273 
274  // void (Connection::*f)(const boost::system::error_code &,
275  // boost::tuple<Handler>) =
276  // &Connection::OnReadHeader<Handler>;
277 
278  // boost::asio::async_read(*this->socket,
279  // boost::asio::buffer(this->inboundHeader),
280  // common::weakBind(f, this->shared_from_this(),
281  // boost::asio::placeholders::error, _handler));
282  }
283  }
284  }
285 
293  private: template<typename Handler>
294  void OnReadData(const boost::system::error_code &_e,
295  boost::tuple<Handler> _handler)
296  {
297  if (_e)
298  {
299  if (_e.value() == boost::asio::error::eof)
300  this->isOpen = false;
301  }
302 
303  // Inform caller that data has been received
304  std::string data(&this->inboundData[0],
305  this->inboundData.size());
306  this->inboundData.clear();
307 
308  if (data.empty())
309  gzerr << "OnReadData got empty data!!!\n";
310 
311  if (!_e && !transport::is_stopped())
312  {
313  ConnectionReadTask *task = new(tbb::task::allocate_root())
314  ConnectionReadTask(boost::get<0>(_handler), data);
315  tbb::task::enqueue(*task);
316 
317  // Non-tbb version:
318  // boost::get<0>(_handler)(data);
319  }
320  }
321 
325  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
326  _subscriber)
327  { return this->shutdown.Connect(_subscriber); }
328 
330  public: void ProcessWriteQueue(bool _blocking = false);
331 
334  public: unsigned int GetId() const;
335 
339  public: static bool ValidateIP(const std::string &_ip);
340 
344  public: std::string GetIPWhiteList() const;
345 
348  private: void PostWrite();
349 
353  private: void OnWrite(const boost::system::error_code &_e);
354 
357  private: void OnAccept(const boost::system::error_code &_e);
358 
361  private: std::size_t ParseHeader(const std::string &_header);
362 
364  private: void ReadLoop(const ReadCallback &_cb);
365 
368  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
369 
372  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
373 
376  private: static std::string GetHostname(
377  boost::asio::ip::tcp::endpoint _ep);
378 
382  private: void OnConnect(const boost::system::error_code &_error,
383  boost::asio::ip::tcp::resolver::iterator _endPointIter);
384 
386  private: boost::asio::ip::tcp::socket *socket;
387 
389  private: boost::asio::ip::tcp::acceptor *acceptor;
390 
392  private: std::deque<std::string> writeQueue;
393 
396  private: std::deque< std::vector<
397  std::pair<boost::function<void(uint32_t)>, uint32_t> > >
398  callbacks;
399 
401  private: boost::mutex connectMutex;
402 
404  private: boost::recursive_mutex writeMutex;
405 
407  private: boost::recursive_mutex readMutex;
408 
410  private: mutable boost::mutex socketMutex;
411 
413  private: boost::condition_variable connectCondition;
414 
416  private: AcceptCallback acceptCB;
417 
419  private: std::vector<char> inboundHeader;
420 
422  private: std::vector<char> inboundData;
423 
425  private: bool readQuit;
426 
428  private: unsigned int id;
429 
431  private: static unsigned int idCounter;
432 
434  private: ConnectionPtr acceptConn;
435 
438 
440  private: static IOManager *iomanager;
441 
443  private: unsigned int writeCount;
444 
446  private: std::string localURI;
447 
449  private: std::string localAddress;
450 
452  private: std::string remoteURI;
453 
455  private: std::string remoteAddress;
456 
458  private: bool connectError;
459 
461  private: std::string ipWhiteList;
462 
464  private: bool dropMsgLogged;
465 
467  private: bool isOpen;
468  };
470  }
471 }
472 #endif
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:110
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:117
Forward declarations for the common classes.
Definition: Animation.hh:26
transport
Definition: ConnectionManager.hh:35
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:126
#define HEADER_LENGTH
Definition: Connection.hh:42
#define gzerr
Output an error message.
Definition: Console.hh:50
Manages boost::asio IO.
Definition: IOManager.hh:35
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:325
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:51
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
#define NULL
Definition: CommonTypes.hh:31
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:206
bool is_stopped()
Is the transport system stopped?
Single TCP/IP connection manager.
Definition: Connection.hh:101