20 #include <google/protobuf/message.h>
22 #include <boost/asio.hpp>
23 #include <boost/bind.hpp>
24 #include <boost/function.hpp>
25 #include <boost/thread.hpp>
26 #include <boost/tuple/tuple.hpp>
39 #define HEADER_LENGTH 8
55 class Connection :
public boost::enable_shared_from_this<Connection>
64 public:
bool Connect(
const std::string &host,
unsigned int port);
71 typedef boost::function<void(const std::string &data)>
ReadCallback;
83 public:
bool IsOpen()
const;
86 private:
void Close();
92 public:
bool Read(std::string &data);
95 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
122 public:
template<
typename Handler>
127 gzerr <<
"AsyncRead on a closed socket\n";
131 void (
Connection::*f)(
const boost::system::error_code &,
132 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
135 boost::asio::async_read(*this->socket,
136 boost::asio::buffer(this->inbound_header),
138 boost::asio::placeholders::error,
139 boost::make_tuple(handler)));
145 private:
template<
typename Handler>
146 void OnReadHeader(
const boost::system::error_code &_e,
147 boost::tuple<Handler> _handler)
151 if (_e.message() !=
"End of File")
160 std::size_t inbound_data_size = 0;
161 std::string header(&this->inbound_header[0],
162 this->inbound_header.size());
163 this->inbound_header.clear();
165 inbound_data_size = this->ParseHeader(header);
167 if (inbound_data_size > 0)
170 this->inbound_data.resize(inbound_data_size);
172 void (
Connection::*f)(
const boost::system::error_code &e,
173 boost::tuple<Handler>) =
174 &Connection::OnReadData<Handler>;
176 boost::asio::async_read(*this->socket,
177 boost::asio::buffer(this->inbound_data),
179 boost::asio::placeholders::error,
184 gzerr <<
"Header is empty\n";
185 boost::get<0>(_handler)(
"");
202 private:
template<
typename Handler>
203 void OnReadData(
const boost::system::error_code &e,
204 boost::tuple<Handler> handler)
207 gzerr <<
"Error Reading data!\n";
210 std::string data(&this->inbound_data[0],
211 this->inbound_data.size());
212 this->inbound_data.clear();
215 gzerr <<
"OnReadData got empty data!!!\n";
219 boost::get<0>(handler)(data);
225 {
return this->shutdown.
Connect(subscriber_); }
233 private:
void OnWrite(
const boost::system::error_code &e,
234 boost::asio::streambuf *_b);
237 private:
void OnAccept(
const boost::system::error_code &e);
240 private: std::size_t ParseHeader(
const std::string &header);
246 private: boost::asio::ip::tcp::endpoint GetLocalEndpoint()
const;
249 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
251 private:
static std::string GetHostname(boost::asio::ip::tcp::endpoint ep);
253 private:
void OnConnect(
const boost::system::error_code &_error,
254 boost::asio::ip::tcp::resolver::iterator _endPointIter);
256 private: boost::asio::ip::tcp::socket *socket;
257 private: boost::asio::ip::tcp::acceptor *acceptor;
259 private: std::deque<std::string> writeQueue;
260 private: std::deque<unsigned int> writeCounts;
261 private: boost::mutex *connectMutex;
262 private: boost::recursive_mutex *writeMutex;
263 private: boost::recursive_mutex *readMutex;
265 private: boost::condition_variable connectCondition;
270 private: std::vector<char> inbound_header;
271 private: std::vector<char> inbound_data;
273 private: boost::thread *readThread;
274 private:
bool readQuit;
276 public:
unsigned int id;
277 private:
static unsigned int idCounter;
285 private: std::string localURI;
286 private: std::string localAddress;
287 private: std::string remoteURI;
288 private: std::string remoteAddress;
290 private:
bool connectError;