17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
21 #include <google/protobuf/message.h>
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
40 #define HEADER_LENGTH 8
62 boost::function<
void (
const std::string &)> _func,
63 const std::string &_data)
73 this->func(this->data);
78 private: boost::function<void (const std::string &)> func;
81 private: std::string data;
90 class Connection :
public boost::enable_shared_from_this<Connection>
102 public:
bool Connect(
const std::string &_host,
unsigned int _port);
129 public:
bool IsOpen()
const;
132 private:
void Close();
140 public:
bool Read(std::string &_data);
146 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
182 public:
template<
typename Handler>
187 gzerr <<
"AsyncRead on a closed socket\n";
191 void (
Connection::*f)(
const boost::system::error_code &,
192 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
195 boost::asio::async_read(*this->socket,
196 boost::asio::buffer(this->inboundHeader),
198 boost::asio::placeholders::error,
199 boost::make_tuple(_handler)));
209 private:
template<
typename Handler>
210 void OnReadHeader(
const boost::system::error_code &_e,
211 boost::tuple<Handler> _handler)
215 if (_e.message() !=
"End of File")
226 std::size_t inboundData_size = 0;
227 std::string header(&this->inboundHeader[0],
228 this->inboundHeader.size());
229 this->inboundHeader.clear();
231 inboundData_size = this->ParseHeader(header);
233 if (inboundData_size > 0)
236 this->inboundData.resize(inboundData_size);
238 void (
Connection::*f)(
const boost::system::error_code &e,
239 boost::tuple<Handler>) =
240 &Connection::OnReadData<Handler>;
242 boost::asio::async_read(*this->socket,
243 boost::asio::buffer(this->inboundData),
245 boost::asio::placeholders::error,
250 gzerr <<
"Header is empty\n";
251 boost::get<0>(_handler)(
"");
275 private:
template<
typename Handler>
276 void OnReadData(
const boost::system::error_code &_e,
277 boost::tuple<Handler> _handler)
281 gzerr <<
"Error Reading data["
282 << _e.message() <<
"]\n";
286 std::string data(&this->inboundData[0],
287 this->inboundData.size());
288 this->inboundData.clear();
291 gzerr <<
"OnReadData got empty data!!!\n";
295 ConnectionReadTask *task =
new(tbb::task::allocate_root())
296 ConnectionReadTask(boost::get<0>(_handler), data);
298 tbb::task::enqueue(*task);
307 {
return this->shutdown.
Connect(_subscriber); }
320 public:
unsigned int GetId()
const;
325 public:
static bool ValidateIP(
const std::string &_ip);
330 private:
void OnWrite(
const boost::system::error_code &_e,
331 boost::asio::streambuf *_b);
335 private:
void OnAccept(
const boost::system::error_code &_e);
339 private: std::size_t ParseHeader(
const std::string &_header);
346 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
350 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
354 private:
static std::string GetHostname(
355 boost::asio::ip::tcp::endpoint _ep);
360 private:
void OnConnect(
const boost::system::error_code &_error,
361 boost::asio::ip::tcp::resolver::iterator _endPointIter);
364 private: boost::asio::ip::tcp::socket *socket;
367 private: boost::asio::ip::tcp::acceptor *acceptor;
370 private: std::deque<std::string> writeQueue;
373 private: boost::mutex connectMutex;
376 private: boost::recursive_mutex writeMutex;
379 private: boost::recursive_mutex readMutex;
382 private:
mutable boost::mutex socketMutex;
385 private: boost::condition_variable connectCondition;
391 private: std::vector<char> inboundHeader;
394 private: std::vector<char> inboundData;
397 private:
bool readQuit;
400 private:
unsigned int id;
403 private:
static unsigned int idCounter;
415 private:
unsigned int writeCount;
418 private: std::string localURI;
421 private: std::string localAddress;
424 private: std::string remoteURI;
427 private: std::string remoteAddress;
430 private:
bool connectError;