17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
20 #include <google/protobuf/message.h>
22 #include <boost/asio.hpp>
23 #include <boost/bind.hpp>
24 #include <boost/function.hpp>
25 #include <boost/thread.hpp>
26 #include <boost/tuple/tuple.hpp>
39 #define HEADER_LENGTH 8
56 class Connection :
public boost::enable_shared_from_this<Connection>
68 public:
bool Connect(
const std::string &_host,
unsigned int _port);
80 typedef boost::function<void(const std::string &_data)>
ReadCallback;
95 public:
bool IsOpen()
const;
98 private:
void Close();
106 public:
bool Read(std::string &_data);
112 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
148 public:
template<
typename Handler>
153 gzerr <<
"AsyncRead on a closed socket\n";
157 void (
Connection::*f)(
const boost::system::error_code &,
158 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
161 boost::asio::async_read(*this->socket,
162 boost::asio::buffer(this->inboundHeader),
164 boost::asio::placeholders::error,
165 boost::make_tuple(_handler)));
175 private:
template<
typename Handler>
176 void OnReadHeader(
const boost::system::error_code &_e,
177 boost::tuple<Handler> _handler)
181 if (_e.message() !=
"End of File")
190 std::size_t inboundData_size = 0;
191 std::string header(&this->inboundHeader[0],
192 this->inboundHeader.size());
193 this->inboundHeader.clear();
195 inboundData_size = this->ParseHeader(header);
197 if (inboundData_size > 0)
200 this->inboundData.resize(inboundData_size);
202 void (
Connection::*f)(
const boost::system::error_code &e,
203 boost::tuple<Handler>) =
204 &Connection::OnReadData<Handler>;
206 boost::asio::async_read(*this->socket,
207 boost::asio::buffer(this->inboundData),
209 boost::asio::placeholders::error,
214 gzerr <<
"Header is empty\n";
215 boost::get<0>(_handler)(
"");
239 private:
template<
typename Handler>
240 void OnReadData(
const boost::system::error_code &_e,
241 boost::tuple<Handler> _handler)
244 gzerr <<
"Error Reading data!\n";
247 std::string data(&this->inboundData[0],
248 this->inboundData.size());
249 this->inboundData.clear();
252 gzerr <<
"OnReadData got empty data!!!\n";
256 boost::get<0>(_handler)(data);
265 {
return this->shutdown.
Connect(_subscriber); }
278 public:
unsigned int GetId()
const;
283 private:
void OnWrite(
const boost::system::error_code &e,
284 boost::asio::streambuf *_b);
288 private:
void OnAccept(
const boost::system::error_code &_e);
292 private: std::size_t ParseHeader(
const std::string &_header);
299 private: boost::asio::ip::tcp::endpoint GetLocalEndpoint()
const;
303 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
307 private:
static std::string GetHostname(
308 boost::asio::ip::tcp::endpoint _ep);
313 private:
void OnConnect(
const boost::system::error_code &_error,
314 boost::asio::ip::tcp::resolver::iterator _endPointIter);
317 private: boost::asio::ip::tcp::socket *socket;
320 private: boost::asio::ip::tcp::acceptor *acceptor;
323 private: std::deque<std::string> writeQueue;
326 private: boost::mutex *connectMutex;
329 private: boost::recursive_mutex *writeMutex;
332 private: boost::recursive_mutex *readMutex;
335 private: boost::condition_variable connectCondition;
341 private: std::vector<char> inboundHeader;
344 private: std::vector<char> inboundData;
347 private:
bool readQuit;
350 private:
unsigned int id;
353 private:
static unsigned int idCounter;
365 private:
unsigned int writeCount;
368 private: std::string localURI;
371 private: std::string localAddress;
374 private: std::string remoteURI;
377 private: std::string remoteAddress;
380 private:
bool connectError;