hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [32/50] [abbrv] hadoop git commit: HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.
Date Mon, 17 Apr 2017 17:58:29 GMT
HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c58c01fe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c58c01fe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c58c01fe

Branch: refs/heads/HDFS-10467
Commit: c58c01fe043c0cf7182eccef1b7205adfa091201
Parents: 11406a4
Author: Andrew Wang <wang@apache.org>
Authored: Tue Apr 4 23:05:24 2017 -0700
Committer: Inigo <inigoiri@apache.org>
Committed: Thu Apr 6 18:58:22 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |  13 +
 .../org/apache/hadoop/hdfs/HAUtilClient.java    |  55 +++
 .../hadoop/hdfs/NameNodeProxiesClient.java      |  15 +-
 .../hdfs/client/HdfsClientConfigKeys.java       |   1 +
 .../namenode/ha/ClientHAProxyFactory.java       |  44 ++
 .../ha/ConfiguredFailoverProxyProvider.java     | 183 +++++++
 .../hdfs/server/namenode/ha/HAProxyFactory.java |  44 ++
 .../namenode/ha/IPFailoverProxyProvider.java    | 126 +++++
 .../ha/RequestHedgingProxyProvider.java         | 234 +++++++++
 .../ha/TestRequestHedgingProxyProvider.java     | 476 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   3 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  15 +-
 .../java/org/apache/hadoop/hdfs/HAUtil.java     |  57 +--
 .../org/apache/hadoop/hdfs/NameNodeProxies.java |   3 +-
 .../hadoop/hdfs/server/namenode/DfsServlet.java |  29 +-
 .../ha/ConfiguredFailoverProxyProvider.java     | 216 ---------
 .../namenode/ha/IPFailoverProxyProvider.java    | 132 -----
 .../namenode/ha/NameNodeHAProxyFactory.java     |  45 ++
 .../ha/RequestHedgingProxyProvider.java         | 241 ----------
 .../hadoop/hdfs/TestDFSClientFailover.java      |   4 +-
 .../org/apache/hadoop/hdfs/TestDFSUtil.java     |   2 +-
 .../namenode/ha/TestDelegationTokensWithHA.java |   4 +-
 .../ha/TestRequestHedgingProxyProvider.java     | 470 ------------------
 23 files changed, 1247 insertions(+), 1165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index f9b2e8d..2e770cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -170,6 +170,19 @@ public class DFSUtilClient {
   }
 
   /**
+   * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
+   * the configuration.
+   *
+   * @param conf configuration
+   * @return list of InetSocketAddresses
+   */
+  public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
+      Configuration conf) {
+    return DFSUtilClient.getAddresses(conf, null,
+      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+  }
+
+  /**
    * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
    * the configuration.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java
index 9f28cfc..47288f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HAUtilClient.java
@@ -20,15 +20,29 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Collection;
 
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
+import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
 
 @InterfaceAudience.Private
 public class HAUtilClient {
+  private static final Logger LOG = LoggerFactory.getLogger(HAUtilClient.class);
+
+  private static final DelegationTokenSelector tokenSelector =
+      new DelegationTokenSelector();
+
   /**
    * @return true if the given nameNodeUri appears to be a logical URI.
    */
@@ -92,4 +106,45 @@ public class HAUtilClient {
   public static boolean isTokenForLogicalUri(Token<?> token) {
     return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
   }
+
+  /**
+   * Locate a delegation token associated with the given HA cluster URI, and if
+   * one is found, clone it to also represent the underlying namenode address.
+   * @param ugi the UGI to modify
+   * @param haUri the logical URI for the cluster
+   * @param nnAddrs collection of NNs in the cluster to which the token
+   * applies
+   */
+  public static void cloneDelegationTokenForLogicalUri(
+      UserGroupInformation ugi, URI haUri,
+      Collection<InetSocketAddress> nnAddrs) {
+    // this cloning logic is only used by hdfs
+    Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
+        HdfsConstants.HDFS_URI_SCHEME);
+    Token<DelegationTokenIdentifier> haToken =
+        tokenSelector.selectToken(haService, ugi.getTokens());
+    if (haToken != null) {
+      for (InetSocketAddress singleNNAddr : nnAddrs) {
+        // this is a minor hack to prevent physical HA tokens from being
+        // exposed to the user via UGI.getCredentials(), otherwise these
+        // cloned tokens may be inadvertently propagated to jobs
+        Token<DelegationTokenIdentifier> specificToken =
+            haToken.privateClone(buildTokenService(singleNNAddr));
+        Text alias = new Text(
+            HAUtilClient.buildTokenServicePrefixForLogicalUri(
+                HdfsConstants.HDFS_URI_SCHEME)
+                + "//" + specificToken.getService());
+        ugi.addToken(alias, specificToken);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Mapped HA service delegation token for logical URI " +
+              haUri + " to namenode " + singleNNAddr);
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No HA service delegation token found for logical URI " +
+            haUri);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
index 5ca7030..a092f02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -212,6 +214,14 @@ public class NameNodeProxiesClient {
   public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
       Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
       AtomicBoolean fallbackToSimpleAuth) throws IOException {
+    return createFailoverProxyProvider(conf, nameNodeUri, xface, checkPort,
+      fallbackToSimpleAuth, new ClientHAProxyFactory<T>());
+  }
+
+  protected static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
+      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
+      AtomicBoolean fallbackToSimpleAuth, HAProxyFactory<T> proxyFactory)
+      throws IOException {
     Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
     AbstractNNFailoverProxyProvider<T> providerNN;
     try {
@@ -223,9 +233,10 @@ public class NameNodeProxiesClient {
       }
       // Create a proxy provider instance.
       Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
-          .getConstructor(Configuration.class, URI.class, Class.class);
+          .getConstructor(Configuration.class, URI.class,
+              Class.class, HAProxyFactory.class);
       FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
-          xface);
+          xface, proxyFactory);
 
       // If the proxy provider is of an old implementation, wrap it.
       if (!(provider instanceof AbstractNNFailoverProxyProvider)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 1a38806..c152a4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -67,6 +67,7 @@ public interface HdfsClientConfigKeys {
 
   String PREFIX = "dfs.client.";
   String  DFS_NAMESERVICES = "dfs.nameservices";
+  String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
   int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
   String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
   int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
new file mode 100644
index 0000000..b887d87
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
+  @Override
+  @SuppressWarnings("unchecked")
+  public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+      Class<T> xface, UserGroupInformation ugi, boolean withRetries,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+    return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
+      nnAddr, conf, ugi, false, fallbackToSimpleAuth);
+  }
+
+  @Override
+  public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+      Class<T> xface, UserGroupInformation ugi, boolean withRetries)
+      throws IOException {
+    return createProxy(conf, nnAddr, xface, ugi, withRetries, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
new file mode 100644
index 0000000..e9c8791
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FailoverProxyProvider implementation which allows one to configure
+ * multiple URIs to connect to during fail-over. A random configured address is
+ * tried first, and on a fail-over event the other addresses are tried
+ * sequentially in a random order.
+ */
+public class ConfiguredFailoverProxyProvider<T> extends
+    AbstractNNFailoverProxyProvider<T> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
+
+  protected final Configuration conf;
+  protected final List<AddressRpcProxyPair<T>> proxies =
+      new ArrayList<AddressRpcProxyPair<T>>();
+  private final UserGroupInformation ugi;
+  protected final Class<T> xface;
+
+  private int currentProxyIndex = 0;
+  private final HAProxyFactory<T> factory;
+
+  public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
+      Class<T> xface, HAProxyFactory<T> factory) {
+    this.xface = xface;
+    this.conf = new Configuration(conf);
+    int maxRetries = this.conf.getInt(
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        maxRetries);
+
+    int maxRetriesOnSocketTimeouts = this.conf.getInt(
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
+    this.conf.setInt(
+            CommonConfigurationKeysPublic
+                    .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+            maxRetriesOnSocketTimeouts);
+
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+
+      Map<String, Map<String, InetSocketAddress>> map =
+          DFSUtilClient.getHaNnRpcAddresses(conf);
+      Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
+
+      if (addressesInNN == null || addressesInNN.size() == 0) {
+        throw new RuntimeException("Could not find any configured addresses " +
+            "for URI " + uri);
+      }
+
+      Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
+      for (InetSocketAddress address : addressesOfNns) {
+        proxies.add(new AddressRpcProxyPair<T>(address));
+      }
+      // Randomize the list to prevent all clients pointing to the same one
+      boolean randomized = conf.getBoolean(
+          HdfsClientConfigKeys.Failover.RANDOM_ORDER,
+          HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
+      if (randomized) {
+        Collections.shuffle(proxies);
+      }
+
+      // The client may have a delegation token set for the logical
+      // URI of the cluster. Clone this token to apply to each of the
+      // underlying IPC addresses so that the IPC code can find it.
+      HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
+      this.factory = factory;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return xface;
+  }
+
+  /**
+   * Lazily initialize the RPC proxy object.
+   */
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
+    if (current.namenode == null) {
+      try {
+        current.namenode = factory.createProxy(conf,
+            current.address, xface, ugi, false, getFallbackToSimpleAuth());
+      } catch (IOException e) {
+        LOG.error("Failed to create RPC proxy to NameNode", e);
+        throw new RuntimeException(e);
+      }
+    }
+    return new ProxyInfo<T>(current.namenode, current.address.toString());
+  }
+
+  @Override
+  public  void performFailover(T currentProxy) {
+    incrementProxyIndex();
+  }
+
+  synchronized void incrementProxyIndex() {
+    currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
+  }
+
+  /**
+   * A little pair object to store the address and connected RPC proxy object to
+   * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
+   */
+  private static class AddressRpcProxyPair<T> {
+    public final InetSocketAddress address;
+    public T namenode;
+
+    public AddressRpcProxyPair(InetSocketAddress address) {
+      this.address = address;
+    }
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * this proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (AddressRpcProxyPair<T> proxy : proxies) {
+      if (proxy.namenode != null) {
+        if (proxy.namenode instanceof Closeable) {
+          ((Closeable)proxy.namenode).close();
+        } else {
+          RPC.stopProxy(proxy.namenode);
+        }
+      }
+    }
+  }
+
+  /**
+   * Logical URI is required for this failover proxy provider.
+   */
+  @Override
+  public boolean useLogicalURI() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
new file mode 100644
index 0000000..f92a74f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This interface aims to decouple the proxy creation implementation that used
+ * in {@link AbstractNNFailoverProxyProvider}. Client side can use
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} to initialize the
+ * proxy while the server side can use NamenodeProtocols
+ */
+@InterfaceAudience.Private
+public interface HAProxyFactory<T> {
+
+  T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+      UserGroupInformation ugi, boolean withRetries,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException;
+
+  T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+      UserGroupInformation ugi, boolean withRetries) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
new file mode 100644
index 0000000..ed250a0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A NNFailoverProxyProvider implementation which works on IP failover setup.
+ * Only one proxy is used to connect to both servers and switching between
+ * the servers is done by the environment/infrastructure, which guarantees
+ * clients can consistently reach only one node at a time.
+ *
+ * Clients with a live connection will likely get connection reset after an
+ * IP failover. This case will be handled by the
+ * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is
+ * not idempotent, it won't get retried.
+ *
+ * A connection reset while setting up a connection (i.e. before sending a
+ * request) will be handled in ipc client.
+ *
+ * The namenode URI must contain a resolvable host name.
+ */
+public class IPFailoverProxyProvider<T> extends
+    AbstractNNFailoverProxyProvider<T> {
+  private final Configuration conf;
+  private final Class<T> xface;
+  private final URI nameNodeUri;
+  private final HAProxyFactory<T> factory;
+  private ProxyInfo<T> nnProxyInfo = null;
+
+  public IPFailoverProxyProvider(Configuration conf, URI uri,
+      Class<T> xface, HAProxyFactory<T> factory) {
+    this.xface = xface;
+    this.nameNodeUri = uri;
+    this.factory = factory;
+
+    this.conf = new Configuration(conf);
+    int maxRetries = this.conf.getInt(
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        maxRetries);
+
+    int maxRetriesOnSocketTimeouts = this.conf.getInt(
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+        maxRetriesOnSocketTimeouts);
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return xface;
+  }
+
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    // Create a non-ha proxy if not already created.
+    if (nnProxyInfo == null) {
+      try {
+        // Create a proxy that is not wrapped in RetryProxy
+        InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
+        nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
+          UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
+      } catch (IOException ioe) {
+        throw new RuntimeException(ioe);
+      }
+    }
+    return nnProxyInfo;
+  }
+
+  /** Nothing to do for IP failover */
+  @Override
+  public void performFailover(T currentProxy) {
+  }
+
+  /**
+   * Close the proxy,
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    if (nnProxyInfo == null) {
+      return;
+    }
+    if (nnProxyInfo.proxy instanceof Closeable) {
+      ((Closeable)nnProxyInfo.proxy).close();
+    } else {
+      RPC.stopProxy(nnProxyInfo.proxy);
+    }
+  }
+
+  /**
+   * Logical URI is not used for IP failover.
+   */
+  @Override
+  public boolean useLogicalURI() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
new file mode 100644
index 0000000..b94e94d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+
+import org.apache.hadoop.io.retry.MultiException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FailoverProxyProvider implementation that technically does not "failover"
+ * per-se. It constructs a wrapper proxy that sends the request to ALL
+ * underlying proxies simultaneously. It assumes the in an HA setup, there will
+ * be only one Active, and the active should respond faster than any configured
+ * standbys. Once it receive a response from any one of the configred proxies,
+ * outstanding requests to other proxies are immediately cancelled.
+ */
+public class RequestHedgingProxyProvider<T> extends
+        ConfiguredFailoverProxyProvider<T> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RequestHedgingProxyProvider.class);
+
+  class RequestHedgingInvocationHandler implements InvocationHandler {
+
+    final Map<String, ProxyInfo<T>> targetProxies;
+
+    public RequestHedgingInvocationHandler(
+            Map<String, ProxyInfo<T>> targetProxies) {
+      this.targetProxies = new HashMap<>(targetProxies);
+    }
+
+    /**
+     * Creates a Executor and invokes all proxies concurrently. This
+     * implementation assumes that Clients have configured proper socket
+     * timeouts, else the call can block forever.
+     *
+     * @param proxy
+     * @param method
+     * @param args
+     * @return
+     * @throws Throwable
+     */
+    @Override
+    public Object
+    invoke(Object proxy, final Method method, final Object[] args)
+            throws Throwable {
+      Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
+      int numAttempts = 0;
+
+      ExecutorService executor = null;
+      CompletionService<Object> completionService;
+      try {
+        // Optimization : if only 2 proxies are configured and one had failed
+        // over, then we dont need to create a threadpool etc.
+        targetProxies.remove(toIgnore);
+        if (targetProxies.size() == 1) {
+          ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
+          Object retVal = method.invoke(proxyInfo.proxy, args);
+          successfulProxy = proxyInfo;
+          return retVal;
+        }
+        executor = Executors.newFixedThreadPool(proxies.size());
+        completionService = new ExecutorCompletionService<>(executor);
+        for (final Map.Entry<String, ProxyInfo<T>> pEntry :
+                targetProxies.entrySet()) {
+          Callable<Object> c = new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+              LOG.trace("Invoking method {} on proxy {}", method,
+                  pEntry.getValue().proxyInfo);
+              return method.invoke(pEntry.getValue().proxy, args);
+            }
+          };
+          proxyMap.put(completionService.submit(c), pEntry.getValue());
+          numAttempts++;
+        }
+
+        Map<String, Exception> badResults = new HashMap<>();
+        while (numAttempts > 0) {
+          Future<Object> callResultFuture = completionService.take();
+          Object retVal;
+          try {
+            retVal = callResultFuture.get();
+            successfulProxy = proxyMap.get(callResultFuture);
+            LOG.debug("Invocation successful on [{}]",
+                successfulProxy.proxyInfo);
+            return retVal;
+          } catch (Exception ex) {
+            ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
+            logProxyException(ex, tProxyInfo.proxyInfo);
+            badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
+            LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
+            numAttempts--;
+          }
+        }
+
+        // At this point we should have All bad results (Exceptions)
+        // Or should have returned with successful result.
+        if (badResults.size() == 1) {
+          throw badResults.values().iterator().next();
+        } else {
+          throw new MultiException(badResults);
+        }
+      } finally {
+        if (executor != null) {
+          LOG.trace("Shutting down threadpool executor");
+          executor.shutdownNow();
+        }
+      }
+    }
+  }
+
+
+  private volatile ProxyInfo<T> successfulProxy = null;
+  private volatile String toIgnore = null;
+
+  public RequestHedgingProxyProvider(Configuration conf, URI uri,
+      Class<T> xface, HAProxyFactory<T> proxyFactory) {
+    super(conf, uri, xface, proxyFactory);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    if (successfulProxy != null) {
+      return successfulProxy;
+    }
+    Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
+    StringBuilder combinedInfo = new StringBuilder("[");
+    for (int i = 0; i < proxies.size(); i++) {
+      ProxyInfo<T> pInfo = super.getProxy();
+      incrementProxyIndex();
+      targetProxyInfos.put(pInfo.proxyInfo, pInfo);
+      combinedInfo.append(pInfo.proxyInfo).append(',');
+    }
+    combinedInfo.append(']');
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+            RequestHedgingInvocationHandler.class.getClassLoader(),
+            new Class<?>[]{xface},
+            new RequestHedgingInvocationHandler(targetProxyInfos));
+    return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
+  }
+
+  @Override
+  public synchronized void performFailover(T currentProxy) {
+    toIgnore = successfulProxy.proxyInfo;
+    successfulProxy = null;
+  }
+
+  /**
+   * Check the exception returned by the proxy log a warning message if it's
+   * not a StandbyException (expected exception).
+   * @param ex Exception to evaluate.
+   * @param proxyInfo Information of the proxy reporting the exception.
+   */
+  private void logProxyException(Exception ex, String proxyInfo) {
+    if (isStandbyException(ex)) {
+      LOG.debug("Invocation returned standby exception on [{}]", proxyInfo);
+    } else {
+      LOG.warn("Invocation returned exception on [{}]", proxyInfo);
+    }
+  }
+
+  /**
+   * Check if the returned exception is caused by an standby namenode.
+   * @param ex Exception to check.
+   * @return If the exception is caused by an standby namenode.
+   */
+  private boolean isStandbyException(Exception ex) {
+    Exception exception = unwrapException(ex);
+    if (exception instanceof RemoteException) {
+      return ((RemoteException) exception).unwrapRemoteException()
+          instanceof StandbyException;
+    }
+    return false;
+  }
+
+  /**
+   * Unwraps the exception. <p>
+   * Example:
+   * <blockquote><pre>
+   * if ex is
+   * ExecutionException(InvocationTargetExeption(SomeException))
+   * returns SomeException
+   * </pre></blockquote>
+   *
+   * @return unwrapped exception
+   */
+  private Exception unwrapException(Exception ex) {
+    if (ex != null) {
+      Throwable cause = ex.getCause();
+      if (cause instanceof Exception) {
+        Throwable innerCause = cause.getCause();
+        if (innerCause instanceof Exception) {
+          return (Exception) innerCause;
+        }
+        return (Exception) cause;
+      }
+    }
+    return ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
new file mode 100644
index 0000000..724b5f0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.io.retry.MultiException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+
+public class TestRequestHedgingProxyProvider {
+
+  private Configuration conf;
+  private URI nnUri;
+  private String ns;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE);
+  }
+
+  @Before
+  public void setup() throws URISyntaxException {
+    ns = "mycluster-" + Time.monotonicNow();
+    nnUri = new URI("hdfs://" + ns);
+    conf = new Configuration();
+    conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
+    conf.set(
+        HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
+    conf.set(
+        HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
+        "machine1.foo.bar:9820");
+    conf.set(
+        HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
+        "machine2.foo.bar:9820");
+  }
+
+  @Test
+  public void testHedgingWhenOneFails() throws Exception {
+    final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(1000);
+        return new long[]{1};
+      }
+    });
+    final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
+            createFactory(badMock, goodMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+  }
+
+  @Test
+  public void testHedgingWhenOneIsSlow() throws Exception {
+    final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(1000);
+        return new long[]{1};
+      }
+    });
+    final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
+            createFactory(goodMock, badMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+  }
+
+  @Test
+  public void testHedgingWhenBothFail() throws Exception {
+    ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
+    ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(worseMock.getStats()).thenThrow(
+            new IOException("Worse mock !!"));
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
+            createFactory(badMock, worseMock));
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since both namenodes throw IOException !!");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof MultiException);
+    }
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(worseMock).getStats();
+  }
+
+  @Test
+  public void testPerformFailover() throws Exception {
+    final AtomicInteger counter = new AtomicInteger(0);
+    final int[] isGood = {1};
+    final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 1) {
+          Thread.sleep(1000);
+          return new long[]{1};
+        }
+        throw new IOException("Was Good mock !!");
+      }
+    });
+    final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 2) {
+          Thread.sleep(1000);
+          return new long[]{2};
+        }
+        throw new IOException("Bad mock !!");
+      }
+    });
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+            new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
+                    createFactory(goodMock, badMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Assert.assertEquals(2, counter.get());
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    // Ensure only the previous successful one is invoked
+    Mockito.verifyNoMoreInteractions(badMock);
+    Assert.assertEquals(3, counter.get());
+
+    // Flip to standby.. so now this should fail
+    isGood[0] = 2;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(4, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter should update only once
+    Assert.assertEquals(5, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(6, counter.get());
+
+    // Flip back to old active.. so now this should fail
+    isGood[0] = 1;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(7, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    // Ensure correct proxy was called
+    Assert.assertEquals(1, stats[0]);
+  }
+
+  @Test
+  public void testPerformFailoverWith3Proxies() throws Exception {
+    conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+            "nn1,nn2,nn3");
+    conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
+            "machine3.foo.bar:9820");
+
+    final AtomicInteger counter = new AtomicInteger(0);
+    final int[] isGood = {1};
+    final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 1) {
+          Thread.sleep(1000);
+          return new long[]{1};
+        }
+        throw new IOException("Was Good mock !!");
+      }
+    });
+    final ClientProtocol badMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 2) {
+          Thread.sleep(1000);
+          return new long[]{2};
+        }
+        throw new IOException("Bad mock !!");
+      }
+    });
+    final ClientProtocol worseMock = Mockito.mock(ClientProtocol.class);
+    Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
+      @Override
+      public long[] answer(InvocationOnMock invocation) throws Throwable {
+        counter.incrementAndGet();
+        if (isGood[0] == 3) {
+          Thread.sleep(1000);
+          return new long[]{3};
+        }
+        throw new IOException("Worse mock !!");
+      }
+    });
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+            new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
+                    createFactory(goodMock, badMock, worseMock));
+    long[] stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    Assert.assertEquals(3, counter.get());
+    Mockito.verify(badMock).getStats();
+    Mockito.verify(goodMock).getStats();
+    Mockito.verify(worseMock).getStats();
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(1, stats[0]);
+    // Ensure only the previous successful one is invoked
+    Mockito.verifyNoMoreInteractions(badMock);
+    Mockito.verifyNoMoreInteractions(worseMock);
+    Assert.assertEquals(4, counter.get());
+
+    // Flip to standby.. so now this should fail
+    isGood[0] = 2;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    Assert.assertEquals(5, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates twice since both proxies are tried on failure
+    Assert.assertEquals(7, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(2, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(8, counter.get());
+
+    // Flip to Other standby.. so now this should fail
+    isGood[0] = 3;
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since previously successful proxy now fails ");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IOException);
+    }
+
+    // Counter should ipdate only 1 time
+    Assert.assertEquals(9, counter.get());
+
+    provider.performFailover(provider.getProxy().proxy);
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+
+    // Ensure correct proxy was called
+    Assert.assertEquals(3, stats[0]);
+
+    // Counter updates twice since both proxies are tried on failure
+    Assert.assertEquals(11, counter.get());
+
+    stats = provider.getProxy().proxy.getStats();
+    Assert.assertTrue(stats.length == 1);
+    Assert.assertEquals(3, stats[0]);
+
+    // Counter updates only once now
+    Assert.assertEquals(12, counter.get());
+  }
+
+  @Test
+  public void testHedgingWhenFileNotFoundException() throws Exception {
+    ClientProtocol active = Mockito.mock(ClientProtocol.class);
+    Mockito
+        .when(active.getBlockLocations(Matchers.anyString(),
+            Matchers.anyLong(), Matchers.anyLong()))
+        .thenThrow(new RemoteException("java.io.FileNotFoundException",
+            "File does not exist!"));
+
+    ClientProtocol standby = Mockito.mock(ClientProtocol.class);
+    Mockito
+        .when(standby.getBlockLocations(Matchers.anyString(),
+            Matchers.anyLong(), Matchers.anyLong()))
+        .thenThrow(
+            new RemoteException("org.apache.hadoop.ipc.StandbyException",
+            "Standby NameNode"));
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri,
+          ClientProtocol.class, createFactory(active, standby));
+    try {
+      provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
+      Assert.fail("Should fail since the active namenode throws"
+          + " FileNotFoundException!");
+    } catch (MultiException me) {
+      for (Exception ex : me.getExceptions().values()) {
+        Exception rEx = ((RemoteException) ex).unwrapRemoteException();
+        if (rEx instanceof StandbyException) {
+          continue;
+        }
+        Assert.assertTrue(rEx instanceof FileNotFoundException);
+      }
+    }
+    Mockito.verify(active).getBlockLocations(Matchers.anyString(),
+        Matchers.anyLong(), Matchers.anyLong());
+    Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
+        Matchers.anyLong(), Matchers.anyLong());
+  }
+
+  @Test
+  public void testHedgingWhenConnectException() throws Exception {
+    ClientProtocol active = Mockito.mock(ClientProtocol.class);
+    Mockito.when(active.getStats()).thenThrow(new ConnectException());
+
+    ClientProtocol standby = Mockito.mock(ClientProtocol.class);
+    Mockito.when(standby.getStats())
+        .thenThrow(
+            new RemoteException("org.apache.hadoop.ipc.StandbyException",
+            "Standby NameNode"));
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri,
+          ClientProtocol.class, createFactory(active, standby));
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since the active namenode throws"
+          + " ConnectException!");
+    } catch (MultiException me) {
+      for (Exception ex : me.getExceptions().values()) {
+        if (ex instanceof RemoteException) {
+          Exception rEx = ((RemoteException) ex)
+              .unwrapRemoteException();
+          Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(),
+              rEx instanceof StandbyException);
+        } else {
+          Assert.assertTrue(ex instanceof ConnectException);
+        }
+      }
+    }
+    Mockito.verify(active).getStats();
+    Mockito.verify(standby).getStats();
+  }
+
+  @Test
+  public void testHedgingWhenConnectAndEOFException() throws Exception {
+    ClientProtocol active = Mockito.mock(ClientProtocol.class);
+    Mockito.when(active.getStats()).thenThrow(new EOFException());
+
+    ClientProtocol standby = Mockito.mock(ClientProtocol.class);
+    Mockito.when(standby.getStats()).thenThrow(new ConnectException());
+
+    RequestHedgingProxyProvider<ClientProtocol> provider =
+        new RequestHedgingProxyProvider<>(conf, nnUri,
+          ClientProtocol.class, createFactory(active, standby));
+    try {
+      provider.getProxy().proxy.getStats();
+      Assert.fail("Should fail since both active and standby namenodes throw"
+          + " Exceptions!");
+    } catch (MultiException me) {
+      for (Exception ex : me.getExceptions().values()) {
+        if (!(ex instanceof ConnectException) &&
+            !(ex instanceof EOFException)) {
+          Assert.fail("Unexpected Exception " + ex.getMessage());
+        }
+      }
+    }
+    Mockito.verify(active).getStats();
+    Mockito.verify(standby).getStats();
+  }
+
+  private HAProxyFactory<ClientProtocol> createFactory(
+      ClientProtocol... protos) {
+    final Iterator<ClientProtocol> iterator =
+        Lists.newArrayList(protos).iterator();
+    return new HAProxyFactory<ClientProtocol>() {
+      @Override
+      public ClientProtocol createProxy(Configuration conf,
+          InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+          UserGroupInformation ugi, boolean withRetries,
+          AtomicBoolean fallbackToSimpleAuth) throws IOException {
+        return iterator.next();
+      }
+
+      @Override
+      public ClientProtocol createProxy(Configuration conf,
+          InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+          UserGroupInformation ugi, boolean withRetries) throws IOException {
+        return iterator.next();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8fea41f..58a2823 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -142,7 +142,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
   public static final String  DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host";
-  public static final String  DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
+  public static final String  DFS_NAMENODE_RPC_ADDRESS_KEY =
+      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
   public static final String  DFS_NAMENODE_RPC_BIND_HOST_KEY = "dfs.namenode.rpc-bind-host";
   public static final String  DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.servicerpc-address";
   public static final String  DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY = "dfs.namenode.servicerpc-bind-host";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 23166e2..47e1c0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -450,19 +450,6 @@ public class DFSUtil {
   }
 
   /**
-   * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
-   * the configuration.
-   * 
-   * @param conf configuration
-   * @return list of InetSocketAddresses
-   */
-  public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
-      Configuration conf) {
-    return DFSUtilClient.getAddresses(conf, null,
-                                      DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
-  }
-
-  /**
    * Returns list of InetSocketAddress corresponding to  backup node rpc 
    * addresses from the configuration.
    * 
@@ -693,7 +680,7 @@ public class DFSUtil {
   
   public static String nnAddressesAsString(Configuration conf) {
     Map<String, Map<String, InetSocketAddress>> addresses =
-      getHaNnRpcAddresses(conf);
+        DFSUtilClient.getHaNnRpcAddresses(conf);
     return addressMapToString(addresses);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index ea535e9..3556086 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -29,7 +29,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
-import static org.apache.hadoop.security.SecurityUtil.buildTokenService;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -39,8 +38,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -48,17 +45,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -67,12 +59,6 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 public class HAUtil {
   
-  private static final Log LOG = 
-    LogFactory.getLog(HAUtil.class);
-  
-  private static final DelegationTokenSelector tokenSelector =
-      new DelegationTokenSelector();
-
   private static final String[] HA_SPECIAL_INDEPENDENT_KEYS = new String[]{
     DFS_NAMENODE_RPC_ADDRESS_KEY,
     DFS_NAMENODE_RPC_BIND_HOST_KEY,
@@ -97,7 +83,7 @@ public class HAUtil {
    */
   public static boolean isHAEnabled(Configuration conf, String nsId) {
     Map<String, Map<String, InetSocketAddress>> addresses =
-      DFSUtil.getHaNnRpcAddresses(conf);
+        DFSUtilClient.getHaNnRpcAddresses(conf);
     if (addresses == null) return false;
     Map<String, InetSocketAddress> nnMap = addresses.get(nsId);
     return nnMap != null && nnMap.size() > 1;
@@ -260,47 +246,6 @@ public class HAUtil {
   }
 
   /**
-   * Locate a delegation token associated with the given HA cluster URI, and if
-   * one is found, clone it to also represent the underlying namenode address.
-   * @param ugi the UGI to modify
-   * @param haUri the logical URI for the cluster
-   * @param nnAddrs collection of NNs in the cluster to which the token
-   * applies
-   */
-  public static void cloneDelegationTokenForLogicalUri(
-      UserGroupInformation ugi, URI haUri,
-      Collection<InetSocketAddress> nnAddrs) {
-    // this cloning logic is only used by hdfs
-    Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
-                                                                 HdfsConstants.HDFS_URI_SCHEME);
-    Token<DelegationTokenIdentifier> haToken =
-        tokenSelector.selectToken(haService, ugi.getTokens());
-    if (haToken != null) {
-      for (InetSocketAddress singleNNAddr : nnAddrs) {
-        // this is a minor hack to prevent physical HA tokens from being
-        // exposed to the user via UGI.getCredentials(), otherwise these
-        // cloned tokens may be inadvertently propagated to jobs
-        Token<DelegationTokenIdentifier> specificToken =
-            haToken.privateClone(buildTokenService(singleNNAddr));
-        Text alias = new Text(
-            HAUtilClient.buildTokenServicePrefixForLogicalUri(
-                HdfsConstants.HDFS_URI_SCHEME)
-                + "//" + specificToken.getService());
-        ugi.addToken(alias, specificToken);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Mapped HA service delegation token for logical URI " +
-              haUri + " to namenode " + singleNNAddr);
-        }
-      }
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No HA service delegation token found for logical URI " +
-            haUri);
-      }
-    }
-  }
-
-  /**
    * Get the internet address of the currently-active NN. This should rarely be
    * used, since callers of this method who connect directly to the NN using the
    * resulting InetSocketAddress will not be able to connect to the active NN if

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index 61d701d..d556c90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.Text;
@@ -112,7 +113,7 @@ public class NameNodeProxies {
       throws IOException {
     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
         NameNodeProxiesClient.createFailoverProxyProvider(conf, nameNodeUri,
-            xface, true, fallbackToSimpleAuth);
+            xface, true, fallbackToSimpleAuth, new NameNodeHAProxyFactory<T>());
 
     if (failoverProxyProvider == null) {
       return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
index 8edaed6..6b489fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
@@ -17,19 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,25 +57,6 @@ abstract class DfsServlet extends HttpServlet {
     doc.endTag();
   }
 
-  /**
-   * Create a {@link NameNode} proxy from the current {@link ServletContext}. 
-   */
-  protected ClientProtocol createNameNodeProxy() throws IOException {
-    ServletContext context = getServletContext();
-    // if we are running in the Name Node, use it directly rather than via 
-    // rpc
-    NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
-    if (nn != null) {
-      return nn.getRpcServer();
-    }
-    InetSocketAddress nnAddr =
-      NameNodeHttpServer.getNameNodeAddressFromContext(context);
-    Configuration conf = new HdfsConfiguration(
-        NameNodeHttpServer.getConfFromContext(context));
-    return NameNodeProxies.createProxy(conf, DFSUtilClient.getNNUri(nnAddr),
-        ClientProtocol.class).getProxy();
-  }
-
   protected UserGroupInformation getUGI(HttpServletRequest request,
                                         Configuration conf) throws IOException {
     return JspHelper.getUGI(getServletContext(), request, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
deleted file mode 100644
index 0e8fa44..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.ha;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * A FailoverProxyProvider implementation which allows one to configure
- * multiple URIs to connect to during fail-over. A random configured address is
- * tried first, and on a fail-over event the other addresses are tried
- * sequentially in a random order.
- */
-public class ConfiguredFailoverProxyProvider<T> extends
-    AbstractNNFailoverProxyProvider<T> {
-  
-  private static final Log LOG =
-      LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
-  
-  interface ProxyFactory<T> {
-    T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
-        UserGroupInformation ugi, boolean withRetries,
-        AtomicBoolean fallbackToSimpleAuth) throws IOException;
-  }
-
-  static class DefaultProxyFactory<T> implements ProxyFactory<T> {
-    @Override
-    public T createProxy(Configuration conf, InetSocketAddress nnAddr,
-        Class<T> xface, UserGroupInformation ugi, boolean withRetries,
-        AtomicBoolean fallbackToSimpleAuth) throws IOException {
-      return NameNodeProxies.createNonHAProxy(conf,
-          nnAddr, xface, ugi, false, fallbackToSimpleAuth).getProxy();
-    }
-  }
-
-  protected final Configuration conf;
-  protected final List<AddressRpcProxyPair<T>> proxies =
-      new ArrayList<AddressRpcProxyPair<T>>();
-  private final UserGroupInformation ugi;
-  protected final Class<T> xface;
-
-  private int currentProxyIndex = 0;
-  private final ProxyFactory<T> factory;
-
-  public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
-      Class<T> xface) {
-    this(conf, uri, xface, new DefaultProxyFactory<T>());
-  }
-
-  @VisibleForTesting
-  ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
-      Class<T> xface, ProxyFactory<T> factory) {
-
-    Preconditions.checkArgument(
-        xface.isAssignableFrom(NamenodeProtocols.class),
-        "Interface class %s is not a valid NameNode protocol!");
-    this.xface = xface;
-    
-    this.conf = new Configuration(conf);
-    int maxRetries = this.conf.getInt(
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
-    this.conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
-        maxRetries);
-    
-    int maxRetriesOnSocketTimeouts = this.conf.getInt(
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
-    this.conf.setInt(
-            CommonConfigurationKeysPublic
-                    .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
-            maxRetriesOnSocketTimeouts);
-
-    try {
-      ugi = UserGroupInformation.getCurrentUser();
-      
-      Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(
-          conf);
-      Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
-      
-      if (addressesInNN == null || addressesInNN.size() == 0) {
-        throw new RuntimeException("Could not find any configured addresses " +
-            "for URI " + uri);
-      }
-      
-      Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
-      for (InetSocketAddress address : addressesOfNns) {
-        proxies.add(new AddressRpcProxyPair<T>(address));
-      }
-      // Randomize the list to prevent all clients pointing to the same one
-      boolean randomized = conf.getBoolean(
-          HdfsClientConfigKeys.Failover.RANDOM_ORDER,
-          HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
-      if (randomized) {
-        Collections.shuffle(proxies);
-      }
-
-      // The client may have a delegation token set for the logical
-      // URI of the cluster. Clone this token to apply to each of the
-      // underlying IPC addresses so that the IPC code can find it.
-      HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
-      this.factory = factory;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-    
-  @Override
-  public Class<T> getInterface() {
-    return xface;
-  }
-
-  /**
-   * Lazily initialize the RPC proxy object.
-   */
-  @Override
-  public synchronized ProxyInfo<T> getProxy() {
-    AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
-    if (current.namenode == null) {
-      try {
-        current.namenode = factory.createProxy(conf,
-            current.address, xface, ugi, false, getFallbackToSimpleAuth());
-      } catch (IOException e) {
-        LOG.error("Failed to create RPC proxy to NameNode", e);
-        throw new RuntimeException(e);
-      }
-    }
-    return new ProxyInfo<T>(current.namenode, current.address.toString());
-  }
-
-  @Override
-  public  void performFailover(T currentProxy) {
-    incrementProxyIndex();
-  }
-
-  synchronized void incrementProxyIndex() {
-    currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
-  }
-
-  /**
-   * A little pair object to store the address and connected RPC proxy object to
-   * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
-   */
-  private static class AddressRpcProxyPair<T> {
-    public final InetSocketAddress address;
-    public T namenode;
-    
-    public AddressRpcProxyPair(InetSocketAddress address) {
-      this.address = address;
-    }
-  }
-
-  /**
-   * Close all the proxy objects which have been opened over the lifetime of
-   * this proxy provider.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    for (AddressRpcProxyPair<T> proxy : proxies) {
-      if (proxy.namenode != null) {
-        if (proxy.namenode instanceof Closeable) {
-          ((Closeable)proxy.namenode).close();
-        } else {
-          RPC.stopProxy(proxy.namenode);
-        }
-      }
-    }
-  }
-
-  /**
-   * Logical URI is required for this failover proxy provider.
-   */
-  @Override
-  public boolean useLogicalURI() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
deleted file mode 100644
index 4e1cb9e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.ha;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A NNFailoverProxyProvider implementation which works on IP failover setup.
- * Only one proxy is used to connect to both servers and switching between
- * the servers is done by the environment/infrastructure, which guarantees
- * clients can consistently reach only one node at a time.
- *
- * Clients with a live connection will likely get connection reset after an
- * IP failover. This case will be handled by the 
- * FailoverOnNetworkExceptionRetry retry policy. I.e. if the call is
- * not idempotent, it won't get retried.
- *
- * A connection reset while setting up a connection (i.e. before sending a
- * request) will be handled in ipc client.
- *
- * The namenode URI must contain a resolvable host name.
- */
-public class IPFailoverProxyProvider<T> extends
-    AbstractNNFailoverProxyProvider<T> {
-  private final Configuration conf;
-  private final Class<T> xface;
-  private final URI nameNodeUri;
-  private ProxyInfo<T> nnProxyInfo = null;
-  
-  public IPFailoverProxyProvider(Configuration conf, URI uri,
-      Class<T> xface) {
-    Preconditions.checkArgument(
-        xface.isAssignableFrom(NamenodeProtocols.class),
-        "Interface class %s is not a valid NameNode protocol!");
-    this.xface = xface;
-    this.nameNodeUri = uri;
-
-    this.conf = new Configuration(conf);
-    int maxRetries = this.conf.getInt(
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
-    this.conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
-        maxRetries);
-    
-    int maxRetriesOnSocketTimeouts = this.conf.getInt(
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
-        HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
-    this.conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
-        maxRetriesOnSocketTimeouts);
-  }
-    
-  @Override
-  public Class<T> getInterface() {
-    return xface;
-  }
-
-  @Override
-  public synchronized ProxyInfo<T> getProxy() {
-    // Create a non-ha proxy if not already created.
-    if (nnProxyInfo == null) {
-      try {
-        // Create a proxy that is not wrapped in RetryProxy
-        InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
-        nnProxyInfo = new ProxyInfo<T>(NameNodeProxies.createNonHAProxy(
-            conf, nnAddr, xface, UserGroupInformation.getCurrentUser(), 
-            false).getProxy(), nnAddr.toString());
-      } catch (IOException ioe) {
-        throw new RuntimeException(ioe);
-      }
-    }
-    return nnProxyInfo;
-  }
-
-  /** Nothing to do for IP failover */
-  @Override
-  public void performFailover(T currentProxy) {
-  }
-
-  /**
-   * Close the proxy,
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    if (nnProxyInfo == null) {
-      return;
-    }
-    if (nnProxyInfo.proxy instanceof Closeable) {
-      ((Closeable)nnProxyInfo.proxy).close();
-    } else {
-      RPC.stopProxy(nnProxyInfo.proxy);
-    }
-  }
-
-  /**
-   * Logical URI is not used for IP failover.
-   */
-  @Override
-  public boolean useLogicalURI() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
new file mode 100644
index 0000000..036b6eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
+
+  @Override
+  public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+      Class<T> xface, UserGroupInformation ugi, boolean withRetries,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+    return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
+      ugi, withRetries, fallbackToSimpleAuth).getProxy();
+  }
+
+  @Override
+  public T createProxy(Configuration conf, InetSocketAddress nnAddr,
+      Class<T> xface, UserGroupInformation ugi, boolean withRetries)
+      throws IOException {
+    return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
+      ugi, withRetries).getProxy();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message