All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <google/protobuf/message.h>
21 
22 #include <boost/asio.hpp>
23 #include <boost/bind.hpp>
24 #include <boost/function.hpp>
25 #include <boost/thread.hpp>
26 #include <boost/tuple/tuple.hpp>
27 
28 #include <string>
29 #include <vector>
30 #include <iostream>
31 #include <iomanip>
32 #include <deque>
33 
34 
35 #include "common/Event.hh"
36 #include "common/Console.hh"
37 #include "common/Exception.hh"
38 
39 #define HEADER_LENGTH 8
40 
41 namespace gazebo
42 {
43  namespace transport
44  {
45  extern bool is_stopped();
46 
47  class IOManager;
48  class Connection;
49  typedef boost::shared_ptr<Connection> ConnectionPtr;
50 
53 
56  class Connection : public boost::enable_shared_from_this<Connection>
57  {
59  public: Connection();
60 
62  public: virtual ~Connection();
63 
68  public: bool Connect(const std::string &_host, unsigned int _port);
69 
71  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
72 
77  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
78 
80  typedef boost::function<void(const std::string &_data)> ReadCallback;
81 
85  public: void StartRead(const ReadCallback &_cb);
86 
88  public: void StopRead();
89 
91  public: void Shutdown();
92 
95  public: bool IsOpen() const;
96 
98  private: void Close();
99 
101  public: void Cancel();
102 
106  public: bool Read(std::string &_data);
107 
112  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
113 
116  public: std::string GetLocalURI() const;
117 
120  public: std::string GetRemoteURI() const;
121 
124  public: std::string GetLocalAddress() const;
125 
128  public: unsigned int GetLocalPort() const;
129 
132  public: std::string GetRemoteAddress() const;
133 
136  public: unsigned int GetRemotePort() const;
137 
140  public: std::string GetRemoteHostname() const;
141 
144  public: std::string GetLocalHostname() const;
145 
148  public: template<typename Handler>
149  void AsyncRead(Handler _handler)
150  {
151  if (!this->IsOpen())
152  {
153  gzerr << "AsyncRead on a closed socket\n";
154  return;
155  }
156 
157  void (Connection::*f)(const boost::system::error_code &,
158  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
159 
160  this->inboundHeader.resize(HEADER_LENGTH);
161  boost::asio::async_read(*this->socket,
162  boost::asio::buffer(this->inboundHeader),
163  boost::bind(f, this,
164  boost::asio::placeholders::error,
165  boost::make_tuple(_handler)));
166  }
167 
175  private: template<typename Handler>
176  void OnReadHeader(const boost::system::error_code &_e,
177  boost::tuple<Handler> _handler)
178  {
179  if (_e)
180  {
181  if (_e.message() != "End of File")
182  {
183  this->Close();
184  // This will occur when the other side closes the
185  // connection
186  }
187  }
188  else
189  {
190  std::size_t inboundData_size = 0;
191  std::string header(&this->inboundHeader[0],
192  this->inboundHeader.size());
193  this->inboundHeader.clear();
194 
195  inboundData_size = this->ParseHeader(header);
196 
197  if (inboundData_size > 0)
198  {
199  // Start the asynchronous call to receive data
200  this->inboundData.resize(inboundData_size);
201 
202  void (Connection::*f)(const boost::system::error_code &e,
203  boost::tuple<Handler>) =
204  &Connection::OnReadData<Handler>;
205 
206  boost::asio::async_read(*this->socket,
207  boost::asio::buffer(this->inboundData),
208  boost::bind(f, this,
209  boost::asio::placeholders::error,
210  _handler));
211  }
212  else
213  {
214  gzerr << "Header is empty\n";
215  boost::get<0>(_handler)("");
216  // This code tries to read the header again. We should
217  // never get here.
218  // this->inboundHeader.resize(HEADER_LENGTH);
219 
220  // void (Connection::*f)(const boost::system::error_code &,
221  // boost::tuple<Handler>) =
222  // &Connection::OnReadHeader<Handler>;
223 
224  // boost::asio::async_read(*this->socket,
225  // boost::asio::buffer(this->inboundHeader),
226  // boost::bind(f, this,
227  // boost::asio::placeholders::error, _handler));
228  }
229  }
230  }
231 
239  private: template<typename Handler>
240  void OnReadData(const boost::system::error_code &_e,
241  boost::tuple<Handler> _handler)
242  {
243  if (_e)
244  gzerr << "Error Reading data!\n";
245 
246  // Inform caller that data has been received
247  std::string data(&this->inboundData[0],
248  this->inboundData.size());
249  this->inboundData.clear();
250 
251  if (data.empty())
252  gzerr << "OnReadData got empty data!!!\n";
253 
254  if (!_e && !transport::is_stopped())
255  {
256  boost::get<0>(_handler)(data);
257  }
258  }
259 
263  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
264  _subscriber)
265  { return this->shutdown.Connect(_subscriber); }
266 
270  public: void DisconnectShutdown(event::ConnectionPtr _subscriber)
271  {this->shutdown.Disconnect(_subscriber);}
272 
274  public: void ProcessWriteQueue();
275 
278  public: unsigned int GetId() const;
279 
283  public: static bool ValidateIP(const std::string &_ip);
284 
288  private: void OnWrite(const boost::system::error_code &e,
289  boost::asio::streambuf *_b);
290 
293  private: void OnAccept(const boost::system::error_code &_e);
294 
297  private: std::size_t ParseHeader(const std::string &_header);
298 
300  private: void ReadLoop(const ReadCallback &_cb);
301 
304  private: boost::asio::ip::tcp::endpoint GetLocalEndpoint() const;
305 
308  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
309 
312  private: static std::string GetHostname(
313  boost::asio::ip::tcp::endpoint _ep);
314 
318  private: void OnConnect(const boost::system::error_code &_error,
319  boost::asio::ip::tcp::resolver::iterator _endPointIter);
320 
322  private: boost::asio::ip::tcp::socket *socket;
323 
325  private: boost::asio::ip::tcp::acceptor *acceptor;
326 
328  private: std::deque<std::string> writeQueue;
329 
331  private: boost::mutex *connectMutex;
332 
334  private: boost::recursive_mutex *writeMutex;
335 
337  private: boost::recursive_mutex *readMutex;
338 
340  private: boost::condition_variable connectCondition;
341 
343  private: AcceptCallback acceptCB;
344 
346  private: std::vector<char> inboundHeader;
347 
349  private: std::vector<char> inboundData;
350 
352  private: bool readQuit;
353 
355  private: unsigned int id;
356 
358  private: static unsigned int idCounter;
359 
361  private: ConnectionPtr acceptConn;
362 
364  private: event::EventT<void()> shutdown;
365 
367  private: static IOManager *iomanager;
368 
370  private: unsigned int writeCount;
371 
373  private: std::string localURI;
374 
376  private: std::string localAddress;
377 
379  private: std::string remoteURI;
380 
382  private: std::string remoteAddress;
383 
385  private: bool connectError;
386  };
388  }
389 }
390 #endif