17 #ifndef _CONNECTION_HH_ 
   18 #define _CONNECTION_HH_ 
   21 #include <google/protobuf/message.h> 
   23 #include <boost/asio.hpp> 
   24 #include <boost/bind.hpp> 
   25 #include <boost/function.hpp> 
   26 #include <boost/thread.hpp> 
   27 #include <boost/tuple/tuple.hpp> 
   40 #define HEADER_LENGTH 8 
   55     class ConnectionReadTask : 
public tbb::task
 
   61       public: ConnectionReadTask(
 
   62                   boost::function<
void (
const std::string &)> _func,
 
   63                   const std::string &_data)
 
   71       public: tbb::task *execute()
 
   73                 this->func(this->data);
 
   78       private: boost::function<void (const std::string &)> func;
 
   81       private: std::string data;
 
   99     class Connection : 
public boost::enable_shared_from_this<Connection>
 
  111       public: 
bool Connect(
const std::string &_host, 
unsigned int _port);
 
  138       public: 
bool IsOpen() 
const;
 
  141       private: 
void Close();
 
  149       public: 
bool Read(std::string &_data);
 
  158       public: 
void EnqueueMsg(
const std::string &_buffer,
 
  159                   boost::function<
void(uint32_t)> _cb, uint32_t _id,
 
  160                   bool _force = 
false);
 
  166       public: 
void EnqueueMsg(
const std::string &_buffer, 
bool _force = 
false);
 
  202       public: 
template<
typename Handler>
 
  207                   gzerr << 
"AsyncRead on a closed socket\n";
 
  211                 void (
Connection::*f)(
const boost::system::error_code &,
 
  212                     boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
 
  215                 boost::asio::async_read(*this->socket,
 
  216                     boost::asio::buffer(this->inboundHeader),
 
  218                                 boost::asio::placeholders::error,
 
  219                                 boost::make_tuple(_handler)));
 
  229       private: 
template<
typename Handler>
 
  230                void OnReadHeader(
const boost::system::error_code &_e,
 
  231                                  boost::tuple<Handler> _handler)
 
  235                   if (_e.message() == 
"End of file")
 
  236                     this->isOpen = 
false;
 
  240                   std::size_t inboundData_size = 0;
 
  241                   std::string header(&this->inboundHeader[0],
 
  242                                       this->inboundHeader.size());
 
  243                   this->inboundHeader.clear();
 
  245                   inboundData_size = this->ParseHeader(header);
 
  247                  if (inboundData_size > 0)
 
  250                     this->inboundData.resize(inboundData_size);
 
  252                     void (
Connection::*f)(
const boost::system::error_code &e,
 
  253                         boost::tuple<Handler>) =
 
  254                       &Connection::OnReadData<Handler>;
 
  256                     boost::asio::async_read(*this->socket,
 
  257                         boost::asio::buffer(this->inboundData),
 
  259                                     boost::asio::placeholders::error,
 
  264                     gzerr << 
"Header is empty\n";
 
  265                     boost::get<0>(_handler)(
"");
 
  289       private: 
template<
typename Handler>
 
  290                void OnReadData(
const boost::system::error_code &_e,
 
  291                               boost::tuple<Handler> _handler)
 
  295                   if (_e.message() == 
"End of file")
 
  296                     this->isOpen = 
false;
 
  300                 std::string data(&this->inboundData[0],
 
  301                                   this->inboundData.size());
 
  302                 this->inboundData.clear();
 
  305                   gzerr << 
"OnReadData got empty data!!!\n";
 
  309                   ConnectionReadTask *task = 
new(tbb::task::allocate_root())
 
  310                         ConnectionReadTask(boost::get<0>(_handler), data);
 
  311                   tbb::task::enqueue(*task);
 
  323               { 
return this->shutdown.
Connect(_subscriber); }
 
  336       public: 
unsigned int GetId() 
const;
 
  341       public: 
static bool ValidateIP(
const std::string &_ip);
 
  351       private: 
void OnWrite(
const boost::system::error_code &_e);
 
  355       private: 
void OnAccept(
const boost::system::error_code &_e);
 
  359       private: std::size_t ParseHeader(
const std::string &_header);
 
  366       private: 
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
 
  370       private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() 
const;
 
  374       private: 
static std::string GetHostname(
 
  375                    boost::asio::ip::tcp::endpoint _ep);
 
  380       private: 
void OnConnect(
const boost::system::error_code &_error,
 
  381                   boost::asio::ip::tcp::resolver::iterator _endPointIter);
 
  384       private: boost::asio::ip::tcp::socket *socket;
 
  387       private: boost::asio::ip::tcp::acceptor *acceptor;
 
  390       private: std::deque<std::string> writeQueue;
 
  395                std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
 
  398       private: boost::mutex connectMutex;
 
  401       private: boost::recursive_mutex writeMutex;
 
  404       private: boost::recursive_mutex readMutex;
 
  407       private: 
mutable boost::mutex socketMutex;
 
  410       private: boost::condition_variable connectCondition;
 
  416       private: std::vector<char> inboundHeader;
 
  419       private: std::vector<char> inboundData;
 
  422       private: 
bool readQuit;
 
  425       private: 
unsigned int id;
 
  428       private: 
static unsigned int idCounter;
 
  440       private: 
unsigned int writeCount;
 
  443       private: std::string localURI;
 
  446       private: std::string localAddress;
 
  449       private: std::string remoteURI;
 
  452       private: std::string remoteAddress;
 
  455       private: 
bool connectError;
 
  458       private: std::string ipWhiteList;
 
  461       private: 
char *headerBuffer;
 
  464       private: 
bool dropMsgLogged;
 
  468       private: 
unsigned int callbackIndex;
 
  471       private: 
bool isOpen;