hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject hadoop git commit: HDFS-9523. libhdfs++: failure to connect to ipv6 host causes CI unit tests to fail. Contributed by Bob Hansen.
Date Wed, 16 Dec 2015 17:27:40 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 522610d95 -> 9b280cc29


HDFS-9523. libhdfs++: failure to connect to ipv6 host causes CI unit tests to fail.  Contributed
by Bob Hansen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9b280cc2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9b280cc2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9b280cc2

Branch: refs/heads/HDFS-8707
Commit: 9b280cc29b5951576bcb0ceb1a2c922dd6d282a9
Parents: 522610d
Author: James <jhc@apache.org>
Authored: Wed Dec 16 12:27:06 2015 -0500
Committer: James <jhc@apache.org>
Committed: Wed Dec 16 12:27:06 2015 -0500

----------------------------------------------------------------------
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  2 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 87 ++++++++++++++------
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 14 ++--
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  | 14 ++--
 .../native/libhdfspp/tests/rpc_engine_test.cc   | 17 +++-
 5 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b280cc2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 1a1163b..fafaa1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -49,7 +49,7 @@ void NameNodeOperations::Connect(const std::string &server,
   m->Push(Resolve(io_service_, server, service,
                   std::back_inserter(m->state())))
       .Push(Bind([this, m](const Continuation::Next &next) {
-        engine_.Connect(m->state().front(), next);
+        engine_.Connect(m->state(), next);
       }));
   m->Run([this, handler](const Status &status, const State &) {
     handler(status);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b280cc2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 76bbf2e..26946bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -33,10 +33,10 @@ template <class NextLayer>
 class RpcConnectionImpl : public RpcConnection {
 public:
   RpcConnectionImpl(RpcEngine *engine);
-  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                        RpcCallback &handler);
   virtual void ConnectAndFlush(
-      const ::asio::ip::tcp::endpoint &server) override;
+      const std::vector<::asio::ip::tcp::endpoint> &server) override;
   virtual void Handshake(RpcCallback &handler) override;
   virtual void Disconnect() override;
   virtual void OnSendCompleted(const ::asio::error_code &ec,
@@ -52,7 +52,11 @@ public:
 
  private:
   const Options options_;
+  std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
   NextLayer next_layer_;
+
+  void ConnectComplete(const ::asio::error_code &ec);
+  void HandshakeComplete(const Status &s);
 };
 
 template <class NextLayer>
@@ -63,7 +67,7 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
 
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Connect(
-    const ::asio::ip::tcp::endpoint &server, RpcCallback &handler) {
+    const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler)
{
   auto connectionSuccessfulReq = std::make_shared<Request>(
       engine_, [handler](::google::protobuf::io::CodedInputStream *is,
                          const Status &status) {
@@ -76,29 +80,66 @@ void RpcConnectionImpl<NextLayer>::Connect(
 
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
-    const ::asio::ip::tcp::endpoint &server) {
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  next_layer_.async_connect(server,
-                            [shared_this, this](const ::asio::error_code &ec) {
-                              std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-                              Status status = ToStatus(ec);
-                              if (status.ok()) {
-                                StartReading();
-                                Handshake([shared_this, this](const Status &s) {
-                                  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-                                  if (s.ok()) {
-                                    FlushPendingRequests();
-                                  } else {
-                                    CommsError(s);
-                                  };
-                                });
-                              } else {
-                                CommsError(status);
-                              }
-                            });
+    const std::vector<::asio::ip::tcp::endpoint> &server) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  if (server.empty()) {
+    Status s = Status::InvalidArgument("No endpoints provided");
+    CommsError(s);
+    return;
+  }
+
+  // Take the first endpoint, but remember the alternatives for later
+  additional_endpoints_ = server;
+  ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
+  additional_endpoints_.erase(additional_endpoints_.begin());
+
+  auto shared_this = shared_from_this();
+  next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code
&ec) {
+    ConnectComplete(ec);
+  });
+}
+
+template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
{
+  auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+
+  Status status = ToStatus(ec);
+  if (status.ok()) {
+    StartReading();
+    Handshake([shared_this, this](const Status & s) {
+      HandshakeComplete(s);
+    });
+  } else {
+    next_layer_.close();
+    if (!additional_endpoints_.empty()) {
+      // If we have additional endpoints, keep trying until we either run out or
+      //    hit one
+      ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
+      additional_endpoints_.erase(additional_endpoints_.begin());
+
+      next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code
&ec) {
+        ConnectComplete(ec);
+      });
+    } else {
+      CommsError(status);
+    }
+  }
 }
 
 template <class NextLayer>
+void RpcConnectionImpl<NextLayer>::HandshakeComplete(const Status &s) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  if (s.ok()) {
+    FlushPendingRequests();
+  } else {
+    CommsError(s);
+  };
+}
+
+
+template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) {
   assert(lock_held(connection_state_lock_));  // Must be holding lock before calling
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b280cc2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index c779b1c..a84257b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -39,13 +39,13 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
       call_id_(0),
       retry_timer(*io_service) {}
 
-void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server,
+void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                         RpcCallback &handler) {
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-  last_endpoint_ = server;
+  last_endpoints_ = server;
 
   conn_ = NewConnection();
-  conn_->Connect(server, handler);
+  conn_->Connect(last_endpoints_, handler);
 }
 
 void RpcEngine::Shutdown() {
@@ -75,7 +75,7 @@ void RpcEngine::AsyncRpc(
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
   if (!conn_) {
     conn_ = NewConnection();
-    conn_->ConnectAndFlush(last_endpoint_);
+    conn_->ConnectAndFlush(last_endpoints_);
   }
   conn_->AsyncRpc(method_name, req, resp, handler);
 }
@@ -103,7 +103,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string
&req,
     std::lock_guard<std::mutex> state_lock(engine_state_lock_);
     if (!conn_) {
         conn_ = NewConnection();
-        conn_->ConnectAndFlush(last_endpoint_);
+        conn_->ConnectAndFlush(last_endpoints_);
       }
     conn = conn_;
   }
@@ -170,10 +170,10 @@ void RpcEngine::RpcCommsError(
       retry_timer.expires_from_now(
           std::chrono::milliseconds(options_.rpc_retry_delay_ms));
       retry_timer.async_wait([this](asio::error_code ec) {
-        if (!ec) conn_->ConnectAndFlush(last_endpoint_);
+        if (!ec) conn_->ConnectAndFlush(last_endpoints_);
       });
     } else {
-      conn_->ConnectAndFlush(last_endpoint_);
+      conn_->ConnectAndFlush(last_endpoints_);
     }
   } else {
     // Connection will try again if someone calls AsyncRpc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b280cc2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 770e163..e6beef6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -110,9 +110,11 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection>
{
   RpcConnection(LockFreeRpcEngine *engine);
   virtual ~RpcConnection();
 
-  virtual void Connect(const ::asio::ip::tcp::endpoint &server,
+  // Note that a single server can have multiple endpoints - especially both
+  //   an ipv4 and ipv6 endpoint
+  virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                        RpcCallback &handler) = 0;
-  virtual void ConnectAndFlush(const ::asio::ip::tcp::endpoint &server) = 0;
+  virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server)
= 0;
   virtual void Handshake(RpcCallback &handler) = 0;
   virtual void Disconnect() = 0;
 
@@ -231,7 +233,7 @@ class RpcEngine : public LockFreeRpcEngine {
             const std::string &client_name, const char *protocol_name,
             int protocol_version);
 
-  void Connect(const ::asio::ip::tcp::endpoint &server, RpcCallback &handler);
+  void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback
&handler);
 
   void AsyncRpc(const std::string &method_name,
                 const ::google::protobuf::MessageLite *req,
@@ -272,6 +274,9 @@ class RpcEngine : public LockFreeRpcEngine {
   std::shared_ptr<RpcConnection> conn_;
   virtual std::shared_ptr<RpcConnection> NewConnection();
   virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options);
+
+  // Remember all of the last endpoints in case we need to reconnect and retry
+  std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
 private:
   ::asio::io_service * const io_service_;
   const Options options_;
@@ -282,9 +287,6 @@ private:
   std::atomic_int call_id_;
   ::asio::deadline_timer retry_timer;
 
-  // Remember the last endpoint in case we need to reconnect to retry
-  ::asio::ip::tcp::endpoint last_endpoint_;
-
   std::mutex engine_state_lock_;
 
 };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b280cc2/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index 71e3978..28c7596 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -43,6 +43,12 @@ namespace pbio = ::google::protobuf::io;
 
 namespace hdfs {
 
+std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> make_endpoint() {
+  std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> result;
+  result.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
+  return result;
+}
+
 class MockRPCConnection : public MockConnectionBase {
  public:
   MockRPCConnection(::asio::io_service &io_service)
@@ -61,6 +67,9 @@ class SharedConnectionEngine : public RpcEngine {
 
 protected:
   std::shared_ptr<RpcConnection> NewConnection() override {
+    // Stuff in some dummy endpoints so we don't error out
+    last_endpoints_ = make_endpoint();
+
     return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
   }
 
@@ -257,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
   EXPECT_CALL(*producer, Produce())
       .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
 
-  engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const
Status &stat) {
+  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat)
{
     complete = true;
     io_service.stop();
     ASSERT_FALSE(stat.ok());
@@ -285,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
       .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
       .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
 
-  engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const
Status &stat) {
+  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat)
{
     complete = true;
     io_service.stop();
     ASSERT_FALSE(stat.ok());
@@ -313,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const
Status &stat) {
+  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat)
{
     complete = true;
     io_service.stop();
     ASSERT_TRUE(stat.ok());
@@ -342,7 +351,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const
Status &stat) {
+  engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat)
{
     complete = true;
     io_service.stop();
     ASSERT_TRUE(stat.ok());


Mime
View raw message