All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <tbb/task.h>
21 #include <google/protobuf/message.h>
22 
23 #include <boost/asio.hpp>
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <boost/thread.hpp>
27 #include <boost/tuple/tuple.hpp>
28 
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <iomanip>
33 #include <deque>
34 
35 
36 #include "common/Event.hh"
37 #include "common/Console.hh"
38 #include "common/Exception.hh"
39 
40 #define HEADER_LENGTH 8
41 
42 namespace gazebo
43 {
44  namespace transport
45  {
46  extern bool is_stopped();
47 
48  class IOManager;
49  class Connection;
50  typedef boost::shared_ptr<Connection> ConnectionPtr;
51 
55  class ConnectionReadTask : public tbb::task
56  {
62  boost::function<void (const std::string &)> _func,
63  const std::string &_data)
64  {
65  this->func = _func;
66  this->data = _data;
67  }
68 
71  public: tbb::task *execute()
72  {
73  this->func(this->data);
74  return NULL;
75  }
76 
78  private: boost::function<void (const std::string &)> func;
79 
81  private: std::string data;
82  };
84 
87 
90  class Connection : public boost::enable_shared_from_this<Connection>
91  {
93  public: Connection();
94 
96  public: virtual ~Connection();
97 
102  public: bool Connect(const std::string &_host, unsigned int _port);
103 
105  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
106 
111  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
112 
114  typedef boost::function<void(const std::string &_data)> ReadCallback;
115 
119  public: void StartRead(const ReadCallback &_cb);
120 
122  public: void StopRead();
123 
125  public: void Shutdown();
126 
129  public: bool IsOpen() const;
130 
132  private: void Close();
133 
135  public: void Cancel();
136 
140  public: bool Read(std::string &_data);
141 
146  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
147 
150  public: std::string GetLocalURI() const;
151 
154  public: std::string GetRemoteURI() const;
155 
158  public: std::string GetLocalAddress() const;
159 
162  public: unsigned int GetLocalPort() const;
163 
166  public: std::string GetRemoteAddress() const;
167 
170  public: unsigned int GetRemotePort() const;
171 
174  public: std::string GetRemoteHostname() const;
175 
178  public: static std::string GetLocalHostname();
179 
182  public: template<typename Handler>
183  void AsyncRead(Handler _handler)
184  {
185  if (!this->IsOpen())
186  {
187  gzerr << "AsyncRead on a closed socket\n";
188  return;
189  }
190 
191  void (Connection::*f)(const boost::system::error_code &,
192  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
193 
194  this->inboundHeader.resize(HEADER_LENGTH);
195  boost::asio::async_read(*this->socket,
196  boost::asio::buffer(this->inboundHeader),
197  boost::bind(f, this,
198  boost::asio::placeholders::error,
199  boost::make_tuple(_handler)));
200  }
201 
209  private: template<typename Handler>
210  void OnReadHeader(const boost::system::error_code &_e,
211  boost::tuple<Handler> _handler)
212  {
213  if (_e)
214  {
215  if (_e.message() != "End of File")
216  {
217  // This will occur when the other side closes the
218  // connection. We don't want spew error messages in this
219  // case.
220  //
221  // It's okay to do nothing here.
222  }
223  }
224  else
225  {
226  std::size_t inboundData_size = 0;
227  std::string header(&this->inboundHeader[0],
228  this->inboundHeader.size());
229  this->inboundHeader.clear();
230 
231  inboundData_size = this->ParseHeader(header);
232 
233  if (inboundData_size > 0)
234  {
235  // Start the asynchronous call to receive data
236  this->inboundData.resize(inboundData_size);
237 
238  void (Connection::*f)(const boost::system::error_code &e,
239  boost::tuple<Handler>) =
240  &Connection::OnReadData<Handler>;
241 
242  boost::asio::async_read(*this->socket,
243  boost::asio::buffer(this->inboundData),
244  boost::bind(f, this,
245  boost::asio::placeholders::error,
246  _handler));
247  }
248  else
249  {
250  gzerr << "Header is empty\n";
251  boost::get<0>(_handler)("");
252  // This code tries to read the header again. We should
253  // never get here.
254  // this->inboundHeader.resize(HEADER_LENGTH);
255 
256  // void (Connection::*f)(const boost::system::error_code &,
257  // boost::tuple<Handler>) =
258  // &Connection::OnReadHeader<Handler>;
259 
260  // boost::asio::async_read(*this->socket,
261  // boost::asio::buffer(this->inboundHeader),
262  // boost::bind(f, this,
263  // boost::asio::placeholders::error, _handler));
264  }
265  }
266  }
267 
275  private: template<typename Handler>
276  void OnReadData(const boost::system::error_code &_e,
277  boost::tuple<Handler> _handler)
278  {
279  if (_e)
280  {
281  gzerr << "Error Reading data["
282  << _e.message() << "]\n";
283  }
284 
285  // Inform caller that data has been received
286  std::string data(&this->inboundData[0],
287  this->inboundData.size());
288  this->inboundData.clear();
289 
290  if (data.empty())
291  gzerr << "OnReadData got empty data!!!\n";
292 
293  if (!_e && !transport::is_stopped())
294  {
295  ConnectionReadTask *task = new(tbb::task::allocate_root())
296  ConnectionReadTask(boost::get<0>(_handler), data);
297 
298  tbb::task::enqueue(*task);
299  }
300  }
301 
305  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
306  _subscriber)
307  { return this->shutdown.Connect(_subscriber); }
308 
312  public: void DisconnectShutdown(event::ConnectionPtr _subscriber)
313  {this->shutdown.Disconnect(_subscriber);}
314 
316  public: void ProcessWriteQueue(bool _blocking = false);
317 
320  public: unsigned int GetId() const;
321 
325  public: static bool ValidateIP(const std::string &_ip);
326 
330  private: void OnWrite(const boost::system::error_code &_e,
331  boost::asio::streambuf *_b);
332 
335  private: void OnAccept(const boost::system::error_code &_e);
336 
339  private: std::size_t ParseHeader(const std::string &_header);
340 
342  private: void ReadLoop(const ReadCallback &_cb);
343 
346  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
347 
350  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
351 
354  private: static std::string GetHostname(
355  boost::asio::ip::tcp::endpoint _ep);
356 
360  private: void OnConnect(const boost::system::error_code &_error,
361  boost::asio::ip::tcp::resolver::iterator _endPointIter);
362 
364  private: boost::asio::ip::tcp::socket *socket;
365 
367  private: boost::asio::ip::tcp::acceptor *acceptor;
368 
370  private: std::deque<std::string> writeQueue;
371 
373  private: boost::mutex connectMutex;
374 
376  private: boost::recursive_mutex writeMutex;
377 
379  private: boost::recursive_mutex readMutex;
380 
382  private: mutable boost::mutex socketMutex;
383 
385  private: boost::condition_variable connectCondition;
386 
388  private: AcceptCallback acceptCB;
389 
391  private: std::vector<char> inboundHeader;
392 
394  private: std::vector<char> inboundData;
395 
397  private: bool readQuit;
398 
400  private: unsigned int id;
401 
403  private: static unsigned int idCounter;
404 
406  private: ConnectionPtr acceptConn;
407 
409  private: event::EventT<void()> shutdown;
410 
412  private: static IOManager *iomanager;
413 
415  private: unsigned int writeCount;
416 
418  private: std::string localURI;
419 
421  private: std::string localAddress;
422 
424  private: std::string remoteURI;
425 
427  private: std::string remoteAddress;
428 
430  private: bool connectError;
431  };
433  }
434 }
435 #endif