17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
21 #include <google/protobuf/message.h>
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
40 #define HEADER_LENGTH 8
55 class ConnectionReadTask :
public tbb::task
61 public: ConnectionReadTask(
62 boost::function<
void (
const std::string &)> _func,
63 const std::string &_data)
71 public: tbb::task *execute()
73 this->func(this->data);
78 private: boost::function<void (const std::string &)> func;
81 private: std::string data;
99 class Connection :
public boost::enable_shared_from_this<Connection>
111 public:
bool Connect(
const std::string &_host,
unsigned int _port);
138 public:
bool IsOpen()
const;
141 private:
void Close();
149 public:
bool Read(std::string &_data);
158 public:
void EnqueueMsg(
const std::string &_buffer,
159 boost::function<
void(uint32_t)> _cb, uint32_t _id,
160 bool _force =
false);
166 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
202 public:
template<
typename Handler>
207 gzerr <<
"AsyncRead on a closed socket\n";
211 void (
Connection::*f)(
const boost::system::error_code &,
212 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
215 boost::asio::async_read(*this->socket,
216 boost::asio::buffer(this->inboundHeader),
218 boost::asio::placeholders::error,
219 boost::make_tuple(_handler)));
229 private:
template<
typename Handler>
230 void OnReadHeader(
const boost::system::error_code &_e,
231 boost::tuple<Handler> _handler)
235 if (_e.message() ==
"End of file")
236 this->isOpen =
false;
240 std::size_t inboundData_size = 0;
241 std::string header(&this->inboundHeader[0],
242 this->inboundHeader.size());
243 this->inboundHeader.clear();
245 inboundData_size = this->ParseHeader(header);
247 if (inboundData_size > 0)
250 this->inboundData.resize(inboundData_size);
252 void (
Connection::*f)(
const boost::system::error_code &e,
253 boost::tuple<Handler>) =
254 &Connection::OnReadData<Handler>;
256 boost::asio::async_read(*this->socket,
257 boost::asio::buffer(this->inboundData),
259 boost::asio::placeholders::error,
264 gzerr <<
"Header is empty\n";
265 boost::get<0>(_handler)(
"");
289 private:
template<
typename Handler>
290 void OnReadData(
const boost::system::error_code &_e,
291 boost::tuple<Handler> _handler)
295 if (_e.message() ==
"End of file")
296 this->isOpen =
false;
300 std::string data(&this->inboundData[0],
301 this->inboundData.size());
302 this->inboundData.clear();
305 gzerr <<
"OnReadData got empty data!!!\n";
309 ConnectionReadTask *task =
new(tbb::task::allocate_root())
310 ConnectionReadTask(boost::get<0>(_handler), data);
311 tbb::task::enqueue(*task);
323 {
return this->shutdown.
Connect(_subscriber); }
336 public:
unsigned int GetId()
const;
341 public:
static bool ValidateIP(
const std::string &_ip);
351 private:
void OnWrite(
const boost::system::error_code &_e);
355 private:
void OnAccept(
const boost::system::error_code &_e);
359 private: std::size_t ParseHeader(
const std::string &_header);
366 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
370 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
374 private:
static std::string GetHostname(
375 boost::asio::ip::tcp::endpoint _ep);
380 private:
void OnConnect(
const boost::system::error_code &_error,
381 boost::asio::ip::tcp::resolver::iterator _endPointIter);
384 private: boost::asio::ip::tcp::socket *socket;
387 private: boost::asio::ip::tcp::acceptor *acceptor;
390 private: std::deque<std::string> writeQueue;
395 std::pair<boost::function<void(uint32_t)>, uint32_t> > callbacks;
398 private: boost::mutex connectMutex;
401 private: boost::recursive_mutex writeMutex;
404 private: boost::recursive_mutex readMutex;
407 private:
mutable boost::mutex socketMutex;
410 private: boost::condition_variable connectCondition;
416 private: std::vector<char> inboundHeader;
419 private: std::vector<char> inboundData;
422 private:
bool readQuit;
425 private:
unsigned int id;
428 private:
static unsigned int idCounter;
440 private:
unsigned int writeCount;
443 private: std::string localURI;
446 private: std::string localAddress;
449 private: std::string remoteURI;
452 private: std::string remoteAddress;
455 private:
bool connectError;
458 private: std::string ipWhiteList;
461 private:
char *headerBuffer;
464 private:
bool dropMsgLogged;
468 private:
unsigned int callbackIndex;
471 private:
bool isOpen;