All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2011 Nate Koenig
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef CONNECTION_HH
18 #define CONNECTION_HH
19 
20 #include <google/protobuf/message.h>
21 
22 #include <boost/asio.hpp>
23 #include <boost/bind.hpp>
24 #include <boost/function.hpp>
25 #include <boost/thread.hpp>
26 #include <boost/tuple/tuple.hpp>
27 
28 #include <string>
29 #include <vector>
30 #include <iostream>
31 #include <iomanip>
32 #include <deque>
33 
34 
35 #include "common/Event.hh"
36 #include "common/Console.hh"
37 #include "common/Exception.hh"
38 
39 #define HEADER_LENGTH 8
40 
41 namespace gazebo
42 {
43  namespace transport
44  {
45  extern bool is_stopped();
46 
47  class IOManager;
48  class Connection;
49  typedef boost::shared_ptr<Connection> ConnectionPtr;
50 
53 
55  class Connection : public boost::enable_shared_from_this<Connection>
56  {
58  public: Connection();
59 
61  public: virtual ~Connection();
62 
64  public: bool Connect(const std::string &host, unsigned int port);
65 
66  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
67 
69  public: void Listen(unsigned int port, const AcceptCallback &accept_cb);
70 
71  typedef boost::function<void(const std::string &data)> ReadCallback;
74  public: void StartRead(const ReadCallback &cb);
75 
77  public: void StopRead();
78 
80  public: void Shutdown();
81 
83  public: bool IsOpen() const;
84 
86  private: void Close();
87 
89  public: void Cancel();
90 
92  public: bool Read(std::string &data);
93 
95  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
96 
98  public: std::string GetLocalURI() const;
99 
101  public: std::string GetRemoteURI() const;
102 
104  public: std::string GetLocalAddress() const;
105 
107  public: unsigned int GetLocalPort() const;
108 
110  public: std::string GetRemoteAddress() const;
111 
113  public: unsigned int GetRemotePort() const;
114 
116  public: std::string GetRemoteHostname() const;
117 
119  public: std::string GetLocalHostname() const;
120 
122  public: template<typename Handler>
123  void AsyncRead(Handler handler)
124  {
125  if (!this->IsOpen())
126  {
127  gzerr << "AsyncRead on a closed socket\n";
128  return;
129  }
130 
131  void (Connection::*f)(const boost::system::error_code &,
132  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
133 
134  this->inbound_header.resize(HEADER_LENGTH);
135  boost::asio::async_read(*this->socket,
136  boost::asio::buffer(this->inbound_header),
137  boost::bind(f, this,
138  boost::asio::placeholders::error,
139  boost::make_tuple(handler)));
140  }
141 
142  // Handle a completed read of a message header. The handler is passed
143  // using a tuple since boost::bind seems to have trouble binding
144  // a function object created using boost::bind as a parameter
145  private: template<typename Handler>
146  void OnReadHeader(const boost::system::error_code &_e,
147  boost::tuple<Handler> _handler)
148  {
149  if (_e)
150  {
151  if (_e.message() != "End of File")
152  {
153  this->Close();
154  // This will occur when the other side closes the
155  // connection
156  }
157  }
158  else
159  {
160  std::size_t inbound_data_size = 0;
161  std::string header(&this->inbound_header[0],
162  this->inbound_header.size());
163  this->inbound_header.clear();
164 
165  inbound_data_size = this->ParseHeader(header);
166 
167  if (inbound_data_size > 0)
168  {
169  // Start the asynchronous call to receive data
170  this->inbound_data.resize(inbound_data_size);
171 
172  void (Connection::*f)(const boost::system::error_code &e,
173  boost::tuple<Handler>) =
174  &Connection::OnReadData<Handler>;
175 
176  boost::asio::async_read(*this->socket,
177  boost::asio::buffer(this->inbound_data),
178  boost::bind(f, this,
179  boost::asio::placeholders::error,
180  _handler));
181  }
182  else
183  {
184  gzerr << "Header is empty\n";
185  boost::get<0>(_handler)("");
186  // This code tries to read the header again. We should
187  // never get here.
188  // this->inbound_header.resize(HEADER_LENGTH);
189 
190  // void (Connection::*f)(const boost::system::error_code &,
191  // boost::tuple<Handler>) =
192  // &Connection::OnReadHeader<Handler>;
193 
194  // boost::asio::async_read(*this->socket,
195  // boost::asio::buffer(this->inbound_header),
196  // boost::bind(f, this,
197  // boost::asio::placeholders::error, _handler));
198  }
199  }
200  }
201 
202  private: template<typename Handler>
203  void OnReadData(const boost::system::error_code &e,
204  boost::tuple<Handler> handler)
205  {
206  if (e)
207  gzerr << "Error Reading data!\n";
208 
209  // Inform caller that data has been received
210  std::string data(&this->inbound_data[0],
211  this->inbound_data.size());
212  this->inbound_data.clear();
213 
214  if (data.empty())
215  gzerr << "OnReadData got empty data!!!\n";
216 
217  if (!e && !transport::is_stopped())
218  {
219  boost::get<0>(handler)(data);
220  }
221  }
222 
223  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
224  subscriber_)
225  { return this->shutdown.Connect(subscriber_); }
226 
227  public: void DisconnectShutdown(event::ConnectionPtr subscriber_)
228  {this->shutdown.Disconnect(subscriber_);}
229 
231  public: void ProcessWriteQueue();
232 
233  private: void OnWrite(const boost::system::error_code &e,
234  boost::asio::streambuf *_b);
235 
237  private: void OnAccept(const boost::system::error_code &e);
238 
240  private: std::size_t ParseHeader(const std::string &header);
241 
243  private: void ReadLoop(const ReadCallback &cb);
244 
246  private: boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
247 
249  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
250 
251  private: static std::string GetHostname(boost::asio::ip::tcp::endpoint ep);
252 
253  private: void OnConnect(const boost::system::error_code &_error,
254  boost::asio::ip::tcp::resolver::iterator _endPointIter);
255 
256  private: boost::asio::ip::tcp::socket *socket;
257  private: boost::asio::ip::tcp::acceptor *acceptor;
258 
259  private: std::deque<std::string> writeQueue;
260  private: std::deque<unsigned int> writeCounts;
261  private: boost::mutex *connectMutex;
262  private: boost::recursive_mutex *writeMutex;
263  private: boost::recursive_mutex *readMutex;
264 
265  private: boost::condition_variable connectCondition;
266 
267  // Called when a new connection is received
268  private: AcceptCallback acceptCB;
269 
270  private: std::vector<char> inbound_header;
271  private: std::vector<char> inbound_data;
272 
273  private: boost::thread *readThread;
274  private: bool readQuit;
275 
276  public: unsigned int id;
277  private: static unsigned int idCounter;
278  private: ConnectionPtr acceptConn;
279 
280  private: event::EventT<void()> shutdown;
281  private: static IOManager *iomanager;
282 
283  public: unsigned int writeCount;
284 
285  private: std::string localURI;
286  private: std::string localAddress;
287  private: std::string remoteURI;
288  private: std::string remoteAddress;
289 
290  private: bool connectError;
291  };
293  }
294 }
295 
296 #endif
297 
298