17 #ifndef _CONNECTION_HH_    18 #define _CONNECTION_HH_    21 #include <google/protobuf/message.h>    23 #include <boost/asio.hpp>    24 #include <boost/bind.hpp>    25 #include <boost/function.hpp>    26 #include <boost/thread.hpp>    27 #include <boost/tuple/tuple.hpp>    42 #define HEADER_LENGTH 8    57     class GZ_TRANSPORT_VISIBLE ConnectionReadTask : 
public tbb::task
    63       public: ConnectionReadTask(
    64                   boost::function<
void (
const std::string &)> _func,
    65                   const std::string &_data) :
    73       public: tbb::task *execute()
    75                 this->func(this->data);
    80       private: boost::function<void (const std::string &)> func;
    83       private: std::string data;
   102       public boost::enable_shared_from_this<Connection>
   114       public: 
bool Connect(
const std::string &_host, 
unsigned int _port);
   123       public: 
void Listen(
unsigned int _port, 
const AcceptCallback &_acceptCB);
   131       public: 
void StartRead(
const ReadCallback &_cb);
   134       public: 
void StopRead();
   137       public: 
void Shutdown();
   141       public: 
bool IsOpen() 
const;
   144       private: 
void Close();
   147       public: 
void Cancel();
   152       public: 
bool Read(std::string &_data);
   161       public: 
void EnqueueMsg(
const std::string &_buffer,
   162                   boost::function<
void(uint32_t)> _cb, uint32_t _id,
   163                   bool _force = 
false);
   169       public: 
void EnqueueMsg(
const std::string &_buffer, 
bool _force = 
false);
   173       public: std::string GetLocalURI() 
const;
   177       public: std::string GetRemoteURI() 
const;
   181       public: std::string GetLocalAddress() 
const;
   185       public: 
unsigned int GetLocalPort() 
const;
   189       public: std::string GetRemoteAddress() 
const;
   193       public: 
unsigned int GetRemotePort() 
const;
   197       public: std::string GetRemoteHostname() 
const;
   201       public: 
static std::string GetLocalHostname();
   205       public: 
template<
typename Handler>
   208                 boost::mutex::scoped_lock lock(this->socketMutex);
   211                   gzerr << 
"AsyncRead on a closed socket\n";
   215                 void (
Connection::*f)(
const boost::system::error_code &,
   216                     boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
   219                 boost::asio::async_read(*this->socket,
   220                     boost::asio::buffer(this->inboundHeader),
   222                                 boost::asio::placeholders::error,
   223                                 boost::make_tuple(_handler)));
   233       private: 
template<
typename Handler>
   234                void OnReadHeader(
const boost::system::error_code &_e,
   235                                  boost::tuple<Handler> _handler)
   239                   if (_e.value() == boost::asio::error::eof)
   240                     this->isOpen = 
false;
   244                   std::size_t inboundData_size = 0;
   245                   std::string header(&this->inboundHeader[0],
   246                                       this->inboundHeader.size());
   247                   this->inboundHeader.clear();
   249                   inboundData_size = this->ParseHeader(header);
   251                  if (inboundData_size > 0)
   254                     this->inboundData.resize(inboundData_size);
   256                     void (
Connection::*f)(
const boost::system::error_code &e,
   257                         boost::tuple<Handler>) =
   258                       &Connection::OnReadData<Handler>;
   260                     boost::asio::async_read(*this->socket,
   261                         boost::asio::buffer(this->inboundData),
   263                                     boost::asio::placeholders::error,
   268                     gzerr << 
"Header is empty\n";
   269                     boost::get<0>(_handler)(
"");
   293       private: 
template<
typename Handler>
   294                void OnReadData(
const boost::system::error_code &_e,
   295                               boost::tuple<Handler> _handler)
   299                   if (_e.value() == boost::asio::error::eof)
   300                     this->isOpen = 
false;
   304                 std::string data(&this->inboundData[0],
   305                                   this->inboundData.size());
   306                 this->inboundData.clear();
   309                   gzerr << 
"OnReadData got empty data!!!\n";
   313                   ConnectionReadTask *task = 
new(tbb::task::allocate_root())
   314                         ConnectionReadTask(boost::get<0>(_handler), data);
   315                   tbb::task::enqueue(*task);
   327               { 
return this->
shutdown.Connect(_subscriber); }
   330       public: 
void ProcessWriteQueue(
bool _blocking = 
false);
   334       public: 
unsigned int GetId() 
const;
   339       public: 
static bool ValidateIP(
const std::string &_ip);
   344       public: std::string GetIPWhiteList() 
const;
   348       private: 
void PostWrite();
   353       private: 
void OnWrite(
const boost::system::error_code &_e);
   357       private: 
void OnAccept(
const boost::system::error_code &_e);
   361       private: std::size_t ParseHeader(
const std::string &_header);
   364       private: 
void ReadLoop(
const ReadCallback &_cb);
   368       private: 
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
   372       private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() 
const;
   376       private: 
static std::string GetHostname(
   377                    boost::asio::ip::tcp::endpoint _ep);
   382       private: 
void OnConnect(
const boost::system::error_code &_error,
   383                   boost::asio::ip::tcp::resolver::iterator _endPointIter);
   386       private: boost::asio::ip::tcp::socket *socket;
   389       private: boost::asio::ip::tcp::acceptor *acceptor;
   392       private: std::deque<std::string> writeQueue;
   396       private: std::deque< std::vector<
   397                std::pair<boost::function<void(uint32_t)>, uint32_t> > >
   401       private: boost::mutex connectMutex;
   404       private: boost::recursive_mutex writeMutex;
   407       private: boost::recursive_mutex readMutex;
   410       private: 
mutable boost::mutex socketMutex;
   413       private: boost::condition_variable connectCondition;
   416       private: AcceptCallback acceptCB;
   419       private: std::vector<char> inboundHeader;
   422       private: std::vector<char> inboundData;
   425       private: 
bool readQuit;
   428       private: 
unsigned int id;
   431       private: 
static unsigned int idCounter;
   434       private: ConnectionPtr acceptConn;
   443       private: 
unsigned int writeCount;
   446       private: std::string localURI;
   449       private: std::string localAddress;
   452       private: std::string remoteURI;
   455       private: std::string remoteAddress;
   458       private: 
bool connectError;
   461       private: std::string ipWhiteList;
   464       private: 
bool dropMsgLogged;
   467       private: 
bool isOpen;
 auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:110
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback. 
Definition: Connection.hh:117
Forward declarations for the common classes. 
Definition: Animation.hh:26
transport
Definition: ConnectionManager.hh:35
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback. 
Definition: Connection.hh:126
#define HEADER_LENGTH
Definition: Connection.hh:42
#define gzerr
Output an error message. 
Definition: Console.hh:50
Manages boost::asio IO. 
Definition: IOManager.hh:35
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down. 
Definition: Connection.hh:325
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:51
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation. 
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
#define NULL
Definition: CommonTypes.hh:31
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data. 
Definition: Connection.hh:206
bool is_stopped()
Is the transport system stopped? 
Single TCP/IP connection manager. 
Definition: Connection.hh:101