17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
21 #include <google/protobuf/message.h>
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
41 #define HEADER_LENGTH 8
62 public: ConnectionReadTask(
63 boost::function<
void (
const std::string &)> _func,
64 const std::string &_data)
72 public: tbb::task *execute()
74 this->func(this->data);
79 private: boost::function<void (const std::string &)> func;
82 private: std::string data;
101 public boost::enable_shared_from_this<Connection>
113 public:
bool Connect(
const std::string &_host,
unsigned int _port);
122 public:
void Listen(
unsigned int _port,
const AcceptCallback &_acceptCB);
133 public:
void StopRead();
136 public:
void Shutdown();
140 public:
bool IsOpen()
const;
143 private:
void Close();
146 public:
void Cancel();
151 public:
bool Read(std::string &_data);
160 public:
void EnqueueMsg(
const std::string &_buffer,
161 boost::function<
void(uint32_t)> _cb, uint32_t _id,
162 bool _force =
false);
168 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
172 public: std::string GetLocalURI()
const;
176 public: std::string GetRemoteURI()
const;
180 public: std::string GetLocalAddress()
const;
184 public:
unsigned int GetLocalPort()
const;
188 public: std::string GetRemoteAddress()
const;
192 public:
unsigned int GetRemotePort()
const;
196 public: std::string GetRemoteHostname()
const;
200 public:
static std::string GetLocalHostname();
204 public:
template<
typename Handler>
209 gzerr <<
"AsyncRead on a closed socket\n";
213 void (
Connection::*f)(
const boost::system::error_code &,
214 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
217 boost::asio::async_read(*this->socket,
218 boost::asio::buffer(this->inboundHeader),
220 boost::asio::placeholders::error,
221 boost::make_tuple(_handler)));
231 private:
template<
typename Handler>
232 void OnReadHeader(
const boost::system::error_code &_e,
233 boost::tuple<Handler> _handler)
237 if (_e.message() ==
"End of file")
238 this->isOpen =
false;
242 std::size_t inboundData_size = 0;
243 std::string header(&this->inboundHeader[0],
244 this->inboundHeader.size());
245 this->inboundHeader.clear();
247 inboundData_size = this->ParseHeader(header);
249 if (inboundData_size > 0)
252 this->inboundData.resize(inboundData_size);
254 void (Connection::*f)(
const boost::system::error_code &e,
255 boost::tuple<Handler>) =
256 &Connection::OnReadData<Handler>;
258 boost::asio::async_read(*this->socket,
259 boost::asio::buffer(this->inboundData),
261 boost::asio::placeholders::error,
266 gzerr <<
"Header is empty\n";
267 boost::get<0>(_handler)(
"");
291 private:
template<
typename Handler>
292 void OnReadData(
const boost::system::error_code &_e,
293 boost::tuple<Handler> _handler)
297 if (_e.message() ==
"End of file")
298 this->isOpen =
false;
302 std::string data(&this->inboundData[0],
303 this->inboundData.size());
304 this->inboundData.clear();
307 gzerr <<
"OnReadData got empty data!!!\n";
311 ConnectionReadTask *task =
new(tbb::task::allocate_root())
312 ConnectionReadTask(boost::get<0>(_handler), data);
313 tbb::task::enqueue(*task);
325 {
return this->
shutdown.Connect(_subscriber); }
331 {this->
shutdown.Disconnect(_subscriber);}
334 public:
void ProcessWriteQueue(
bool _blocking =
false);
338 public:
unsigned int GetId()
const;
343 public:
static bool ValidateIP(
const std::string &_ip);
348 public: std::string GetIPWhiteList()
const;
353 private:
void OnWrite(
const boost::system::error_code &_e);
357 private:
void OnAccept(
const boost::system::error_code &_e);
361 private: std::size_t ParseHeader(
const std::string &_header);
364 private:
void ReadLoop(
const ReadCallback &_cb);
368 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
372 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
376 private:
static std::string GetHostname(
377 boost::asio::ip::tcp::endpoint _ep);
382 private:
void OnConnect(
const boost::system::error_code &_error,
383 boost::asio::ip::tcp::resolver::iterator _endPointIter);
386 private: boost::asio::ip::tcp::socket *socket;
389 private: boost::asio::ip::tcp::acceptor *acceptor;
392 private: std::deque<std::string> writeQueue;
397 std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
400 private: boost::mutex connectMutex;
403 private: boost::recursive_mutex writeMutex;
406 private: boost::recursive_mutex readMutex;
409 private:
mutable boost::mutex socketMutex;
412 private: boost::condition_variable connectCondition;
415 private: AcceptCallback acceptCB;
418 private: std::vector<char> inboundHeader;
421 private: std::vector<char> inboundData;
424 private:
bool readQuit;
427 private:
unsigned int id;
430 private:
static unsigned int idCounter;
442 private:
unsigned int writeCount;
445 private: std::string localURI;
448 private: std::string localAddress;
451 private: std::string remoteURI;
454 private: std::string remoteAddress;
457 private:
bool connectError;
460 private: std::string ipWhiteList;
463 private:
char *headerBuffer;
466 private:
bool dropMsgLogged;
470 private:
unsigned int callbackIndex;
473 private:
bool isOpen;