hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1232284 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/balancer/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/
Date Tue, 17 Jan 2012 03:10:26 GMT
Author: todd
Date: Tue Jan 17 03:10:25 2012
New Revision: 1232284

URL: http://svn.apache.org/viewvc?rev=1232284&view=rev
Log:
HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. Contributed by
Uma Maheswara Rao G.

Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Tue Jan 17 03:10:25 2012
@@ -109,3 +109,5 @@ HDFS-2789. TestHAAdmin.testFailover is f
 HDFS-2747. Entering safe mode after starting SBN can NPE. (Uma Maheswara Rao G via todd)
 
 HDFS-2772. On transition to active, standby should not swallow ELIE. (atm)
+
+HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol. (Uma Maheswara
Rao G via todd)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Tue Jan 17 03:10:25 2012
@@ -94,9 +94,6 @@ import org.apache.hadoop.io.EnumSetWrita
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -109,7 +106,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -312,20 +308,10 @@ public class DFSClient implements java.i
     this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
     
     this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-    
-    Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass(
-        nameNodeUri, conf);
-    
-    if (nameNodeUri != null && failoverProxyProviderClass != null) {
-      FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
-          ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
-      this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
-          failoverProxyProvider,
-          RetryPolicies.failoverOnNetworkException(
-              RetryPolicies.TRY_ONCE_THEN_FAIL,
-              dfsClientConf.maxFailoverAttempts,
-              dfsClientConf.failoverSleepBaseMillis,
-              dfsClientConf.failoverSleepMaxMillis));
+    ClientProtocol failoverNNProxy = (ClientProtocol) HAUtil
+        .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class);
+    if (nameNodeUri != null && failoverNNProxy != null) {
+      this.namenode = failoverNNProxy;
       nnAddress = null;
     } else if (nameNodeUri != null && rpcNamenode == null) {
       this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf);
@@ -353,39 +339,6 @@ public class DFSClient implements java.i
       LOG.debug("Short circuit read is " + shortCircuitLocalReads);
     }
   }
-  
-  private Class<?> getFailoverProxyProviderClass(URI nameNodeUri, Configuration conf)
-      throws IOException {
-    if (nameNodeUri == null) {
-      return null;
-    }
-    String host = nameNodeUri.getHost();
-
-    String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host;
-    try {
-      Class<?> ret = conf.getClass(configKey, null);
-      if (ret != null) {
-        // If we found a proxy provider, then this URI should be a logical NN.
-        // Given that, it shouldn't have a non-default port number.
-        int port = nameNodeUri.getPort();
-        if (port > 0 && port != NameNode.DEFAULT_PORT) {
-          throw new IOException(
-              "Port " + port + " specified in URI " + nameNodeUri +
-              " but host '" + host + "' is a logical (HA) namenode" +
-              " and does not use port information.");
-        }
-      }
-      return ret;
-    } catch (RuntimeException e) {
-      if (e.getCause() instanceof ClassNotFoundException) {
-        throw new IOException("Could not load failover proxy provider class "
-            + conf.get(configKey) + " which is configured for authority " + nameNodeUri,
-            e);
-      } else {
-        throw e;
-      }
-    }
-  }
 
   /**
    * Return the number of times the client should go back to the namenode

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
Tue Jan 17 03:10:25 2012
@@ -28,10 +28,12 @@ import java.security.SecureRandom;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.SocketFactory;
 
@@ -47,7 +49,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@@ -810,6 +817,32 @@ public class DFSUtil {
   }
   
   /**
+   * Build a NamenodeProtocol connection to the namenode and set up the retry
+   * policy
+   */
+  public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+      throws IOException {
+    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
+        TimeUnit.MILLISECONDS);
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap 
+        = new HashMap<Class<? extends Exception>, RetryPolicy>();
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
+        exceptionToPolicyMap);
+    Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
+    methodNameToPolicyMap.put("getBlocks", methodPolicy);
+    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
+    RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, RPC
+        .getProtocolVersion(NamenodeProtocolPB.class), address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf));
+    NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
+        NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+    return new NamenodeProtocolTranslatorPB(retryProxy);
+  }
+  
+  /**
    * Get nameservice Id for the {@link NameNode} based on namenode RPC address
    * matching the local node address.
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
Tue Jan 17 03:10:25 2012
@@ -18,13 +18,23 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-
+import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.net.URI;
 import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
 
 public class HAUtil {
   private HAUtil() { /* Hidden constructor */ }
@@ -110,5 +120,84 @@ public class HAUtil {
   public static void setAllowStandbyReads(Configuration conf, boolean val) {
     conf.setBoolean("dfs.ha.allow.stale.reads", val);
   }
+ 
+  /** Creates the Failover proxy provider instance*/
+  @SuppressWarnings("unchecked")
+  public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+      Configuration conf, Class<FailoverProxyProvider<?>> failoverProxyProviderClass,
+      Class xface) throws IOException {
+    Preconditions.checkArgument(
+        xface.isAssignableFrom(NamenodeProtocols.class),
+        "Interface %s is not a NameNode protocol", xface);
+    try {
+      Constructor<FailoverProxyProvider<?>> ctor = failoverProxyProviderClass
+          .getConstructor(Class.class);
+      FailoverProxyProvider<?> provider = ctor.newInstance(xface);
+      ReflectionUtils.setConf(provider, conf);
+      return (FailoverProxyProvider<T>) provider;
+    } catch (Exception e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new IOException(
+            "Couldn't create proxy provider " + failoverProxyProviderClass, e);
+      }
+    }
+  }
 
+  /** Gets the configured Failover proxy provider's class */
+  public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
+      Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
+    if (nameNodeUri == null) {
+      return null;
+    }
+    String host = nameNodeUri.getHost();
+
+    String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
+        + host;
+    try {
+      @SuppressWarnings("unchecked")
+      Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>)
conf
+          .getClass(configKey, null, FailoverProxyProvider.class);
+      if (ret != null) {
+        // If we found a proxy provider, then this URI should be a logical NN.
+        // Given that, it shouldn't have a non-default port number.
+        int port = nameNodeUri.getPort();
+        if (port > 0 && port != NameNode.DEFAULT_PORT) {
+          throw new IOException("Port " + port + " specified in URI "
+              + nameNodeUri + " but host '" + host
+              + "' is a logical (HA) namenode"
+              + " and does not use port information.");
+        }
+      }
+      return ret;
+    } catch (RuntimeException e) {
+      if (e.getCause() instanceof ClassNotFoundException) {
+        throw new IOException("Could not load failover proxy provider class "
+            + conf.get(configKey) + " which is configured for authority "
+            + nameNodeUri, e);
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /** Creates the namenode proxy with the passed Protocol */
+  @SuppressWarnings("unchecked")
+  public static Object createFailoverProxy(Configuration conf, URI nameNodeUri,
+      Class xface) throws IOException {
+    Class<FailoverProxyProvider<?>> failoverProxyProviderClass = HAUtil
+        .getFailoverProxyProviderClass(conf, nameNodeUri, xface);
+    if (failoverProxyProviderClass != null) {
+      FailoverProxyProvider<?> failoverProxyProvider = HAUtil
+          .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface);
+      Conf config = new Conf(conf);
+      return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies
+          .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+              config.maxFailoverAttempts, config.failoverSleepBaseMillis,
+              config.failoverSleepMaxMillis));
+    }
+    return null;
+  }
+  
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Tue Jan 17 03:10:25 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -58,7 +57,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 
 /**
@@ -88,7 +86,8 @@ class NameNodeConnector {
     InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
     // TODO(HA): need to deal with connecting to HA NN pair here
     this.namenodeAddress = nn;
-    this.namenode = createNamenode(nn, conf);
+    this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf,
+        UserGroupInformation.getCurrentUser());
     this.client = DFSUtil.createNamenode(conf);
     this.fs = FileSystem.get(NameNode.getUri(nn), conf);
 
@@ -196,33 +195,6 @@ class NameNodeConnector {
         + "]";
   }
 
-  /** Build a NamenodeProtocol connection to the namenode and
-   * set up the retry policy
-   */ 
-  private static NamenodeProtocol createNamenode(InetSocketAddress address,
-      Configuration conf) throws IOException {
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        5, 200, TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        timeoutPolicy, exceptionToPolicyMap);
-    Map<String,RetryPolicy> methodNameToPolicyMap =
-        new HashMap<String, RetryPolicy>();
-    methodNameToPolicyMap.put("getBlocks", methodPolicy);
-    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
-
-    RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
-        ProtobufRpcEngine.class);
-    NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
-            RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
-            UserGroupInformation.getCurrentUser(), conf,
-            NetUtils.getDefaultSocketFactory(conf));
-    NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
-        NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
-    return new NamenodeProtocolTranslatorPB(retryProxy);
-  }
-
   /**
    * Periodically updates access keys.
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1232284&r1=1232283&r2=1232284&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
Tue Jan 17 03:10:25 2012
@@ -32,52 +32,75 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A FailoverProxyProvider implementation which allows one to configure two URIs
  * to connect to during fail-over. The first configured address is tried first,
  * and on a fail-over event the other address is tried.
  */
-public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
-    Configurable {
+public class ConfiguredFailoverProxyProvider<T> implements
+    FailoverProxyProvider<T>, Configurable {
   
   private static final Log LOG =
       LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
   
   private Configuration conf;
   private int currentProxyIndex = 0;
-  private List<AddressRpcProxyPair> proxies = new ArrayList<AddressRpcProxyPair>();
+  private List<AddressRpcProxyPair<T>> proxies = new ArrayList<AddressRpcProxyPair<T>>();
   private UserGroupInformation ugi;
+  private final Class<T> xface;
 
+  public ConfiguredFailoverProxyProvider(Class<T> xface) {
+    Preconditions.checkArgument(
+        xface.isAssignableFrom(NamenodeProtocols.class),
+        "Interface class %s is not a valid NameNode protocol!");
+    this.xface = xface;
+  }
+    
   @Override
-  public Class<?> getInterface() {
-    return ClientProtocol.class;
+  public Class<T> getInterface() {
+    return xface;
   }
 
   /**
    * Lazily initialize the RPC proxy object.
    */
+  @SuppressWarnings("unchecked")
   @Override
-  public synchronized Object getProxy() {
+  public synchronized T getProxy() {
     AddressRpcProxyPair current = proxies.get(currentProxyIndex);
     if (current.namenode == null) {
       try {
-        // TODO(HA): This will create a NN proxy with an underlying retry
-        // proxy. We don't want this.
-        current.namenode = DFSUtil.createNamenode(current.address, conf, ugi);
+        if (NamenodeProtocol.class.equals(xface)) {
+          current.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(
+              current.address, conf, ugi);
+        } else if (ClientProtocol.class.equals(xface)) {
+          // TODO(HA): This will create a NN proxy with an underlying retry
+          // proxy. We don't want this.
+          current.namenode = DFSUtil.createNamenode(current.address, conf, ugi);
+        } else {
+          throw new IllegalStateException(
+              "Upsupported protocol found when creating the proxy conection to NameNode.
"
+                  + ((xface != null) ? xface.getClass().getName() : xface)
+                  + " is not supported by " + this.getClass().getName());
+        }
       } catch (IOException e) {
         LOG.error("Failed to create RPC proxy to NameNode", e);
         throw new RuntimeException(e);
       }
     }
-    return current.namenode;
+    return (T)current.namenode;
   }
 
   @Override
-  public synchronized void performFailover(Object currentProxy) {
+  public synchronized void performFailover(T currentProxy) {
     currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
   }
 
@@ -113,7 +136,7 @@ public class ConfiguredFailoverProxyProv
       Map<String, InetSocketAddress> addressesInNN = map.get(nsId);
       
       for (InetSocketAddress address : addressesInNN.values()) {
-        proxies.add(new AddressRpcProxyPair(address));
+        proxies.add(new AddressRpcProxyPair<T>(address));
       }
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -124,9 +147,9 @@ public class ConfiguredFailoverProxyProv
    * A little pair object to store the address and connected RPC proxy object to
    * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
    */
-  private static class AddressRpcProxyPair {
+  private static class AddressRpcProxyPair<T> {
     public InetSocketAddress address;
-    public ClientProtocol namenode;
+    public T namenode;
     
     public AddressRpcProxyPair(InetSocketAddress address) {
       this.address = address;
@@ -139,7 +162,7 @@ public class ConfiguredFailoverProxyProv
    */
   @Override
   public synchronized void close() throws IOException {
-    for (AddressRpcProxyPair proxy : proxies) {
+    for (AddressRpcProxyPair<T> proxy : proxies) {
       if (proxy.namenode != null) {
         if (proxy.namenode instanceof Closeable) {
           ((Closeable)proxy.namenode).close();



Mime
View raw message