hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1179896 - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdf...
Date Thu, 06 Oct 2011 23:26:15 GMT
Author: atm
Date: Thu Oct  6 23:26:14 2011
New Revision: 1179896

URL: http://svn.apache.org/viewvc?rev=1179896&view=rev
Log:
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active
namenode. (atm)

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Thu Oct  6 23:26:14 2011
@@ -9,3 +9,5 @@ HDFS-2179. Add fencing framework and mec
 HDFS-1974. Introduce active and standy states to the namenode. (suresh)
 
 HDFS-2407. getServerDefaults and getStats don't check operation category (atm)
+
+HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active
namenode. (atm)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
Thu Oct  6 23:26:14 2011
@@ -80,8 +80,7 @@ public class Hdfs extends AbstractFileSy
       throw new IOException("Incomplete HDFS URI, no host: " + theUri);
     }
 
-    InetSocketAddress namenode = NameNode.getAddress(theUri.getAuthority());
-    this.dfs = new DFSClient(namenode, conf, getStatistics());
+    this.dfs = new DFSClient(theUri, conf, getStatistics());
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
Thu Oct  6 23:26:14 2011
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -26,11 +25,11 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.URI;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
@@ -87,6 +86,9 @@ import org.apache.hadoop.io.EnumSetWrita
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -96,6 +98,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -199,7 +202,7 @@ public class DFSClient implements java.i
    */
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
-        
+
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -209,12 +212,16 @@ public class DFSClient implements java.i
   public DFSClient(Configuration conf) throws IOException {
     this(NameNode.getAddress(conf), conf);
   }
+  
+  public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
+    this(NameNode.getUri(address), conf);
+  }
 
   /**
    * Same as this(nameNodeAddr, conf, null);
    * @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
    */
-  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf
+  public DFSClient(URI nameNodeAddr, Configuration conf
       ) throws IOException {
     this(nameNodeAddr, conf, null);
   }
@@ -223,17 +230,17 @@ public class DFSClient implements java.i
    * Same as this(nameNodeAddr, null, conf, stats);
    * @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)

    */
-  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
+  public DFSClient(URI nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
     throws IOException {
     this(nameNodeAddr, null, conf, stats);
   }
-
+  
   /** 
    * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
    * Exactly one of nameNodeAddr or rpcNamenode must be null.
    */
-  DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
+  DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
       Configuration conf, FileSystem.Statistics stats)
     throws IOException {
     // Copy only the required DFSClient configuration
@@ -246,20 +253,45 @@ public class DFSClient implements java.i
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
-    final String authority = nameNodeAddr == null? "null":
-        nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    
+    final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
     this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
     this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
+    
     this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-    if (nameNodeAddr != null && rpcNamenode == null) {
-      this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf);
-    } else if (nameNodeAddr == null && rpcNamenode != null) {
+    
+    Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass(authority,
conf);
+    
+    if (nameNodeUri != null && failoverProxyProviderClass != null) {
+      FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
+          ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
+      this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
+          failoverProxyProvider, RetryPolicies.failoverOnNetworkException(1));
+    } else if (nameNodeUri != null && rpcNamenode == null) {
+      this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf);
+    } else if (nameNodeUri == null && rpcNamenode != null) {
       //This case is used for testing.
       this.namenode = rpcNamenode;
     } else {
       throw new IllegalArgumentException(
           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
-          + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
+          + "nameNodeAddr=" + nameNodeUri + ", rpcNamenode=" + rpcNamenode);
+    }
+  }
+  
+  private Class<?> getFailoverProxyProviderClass(String authority, Configuration conf)
+      throws IOException {
+    String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + authority;
+    try {
+      return conf.getClass(configKey, null);
+    } catch (RuntimeException e) {
+      if (e.getCause() instanceof ClassNotFoundException) {
+        throw new IOException("Could not load failover proxy provider class "
+            + conf.get(configKey) + " which is configured for authority " + authority,
+            e);
+      } else {
+        throw e;
+      }
     }
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Thu Oct  6 23:26:14 2011
@@ -46,6 +46,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT
= "DEFAULT";
   public static final String  DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
+  public static final String  DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
Thu Oct  6 23:26:14 2011
@@ -617,15 +617,19 @@ public class DFSUtil {
   }
 
   /** Create a {@link NameNode} proxy */
-  public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+  public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
       Configuration conf) throws IOException {
-    return createNamenode(createRPCNamenode(nameNodeAddr, conf,
-        UserGroupInformation.getCurrentUser()));
-    
+    return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser());
+  }
+  
+  /** Create a {@link NameNode} proxy */
+  public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
+      Configuration conf, UserGroupInformation ugi) throws IOException {
+    return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi));
   }
 
   /** Create a {@link NameNode} proxy */
-  static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
+  public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
       Configuration conf, UserGroupInformation ugi) 
     throws IOException {
     return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
@@ -634,7 +638,7 @@ public class DFSUtil {
   }
 
   /** Create a {@link NameNode} proxy */
-  static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
+  public static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
     throws IOException {
     RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
Thu Oct  6 23:26:14 2011
@@ -106,8 +106,7 @@ public class DistributedFileSystem exten
       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
     }
 
-    InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
-    this.dfs = new DFSClient(namenode, conf, statistics);
+    this.dfs = new DFSClient(uri, conf, statistics);
     this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority());
     this.workingDir = getHomeDirectory();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
Thu Oct  6 23:26:14 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.KerberosInfo;
@@ -99,6 +100,7 @@ public interface ClientProtocol extends 
    * @throws IOException If an I/O error occurred
    */
   @Nullable
+  @Idempotent
   public LocatedBlocks getBlockLocations(String src,
                                          long offset,
                                          long length) 
@@ -249,7 +251,7 @@ public interface ClientProtocol extends 
       UnresolvedLinkException, IOException;
 
   /**
-   * The client can give up on a blcok by calling abandonBlock().
+   * The client can give up on a block by calling abandonBlock().
    * The client can then
    * either obtain a new block, or complete or abandon the file.
    * Any partial writes to the block will be discarded.
@@ -721,6 +723,7 @@ public interface ClientProtocol extends 
    * @throws IOException If an I/O error occurred        
    */
   @Nullable
+  @Idempotent
   public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Thu Oct  6 23:26:14 2011
@@ -267,7 +267,7 @@ public class NameNode {
    * @param filesystemURI
    * @return address of file system
    */
-  static InetSocketAddress getAddress(URI filesystemURI) {
+  public static InetSocketAddress getAddress(URI filesystemURI) {
     String authority = filesystemURI.getAuthority();
     if (authority == null) {
       throw new IllegalArgumentException(String.format(

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1179896&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
Thu Oct  6 23:26:14 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A FailoverProxyProvider implementation which allows one to configure two URIs
+ * to connect to during fail-over. The first configured address is tried first,
+ * and on a fail-over event the other address is tried.
+ */
+public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
+    Configurable {
+  
+  public static final String CONFIGURED_NAMENODE_ADDRESSES
+      = "dfs.ha.namenode.addresses";
+  
+  private static final Log LOG =
+      LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
+  
+  private Configuration conf;
+  private int currentProxyIndex = 0;
+  private List<AddressRpcProxyPair> proxies = new ArrayList<AddressRpcProxyPair>();
+  private UserGroupInformation ugi;
+
+  @Override
+  public Class<?> getInterface() {
+    return ClientProtocol.class;
+  }
+
+  /**
+   * Lazily initialize the RPC proxy object.
+   */
+  @Override
+  public synchronized Object getProxy() {
+    AddressRpcProxyPair current = proxies.get(currentProxyIndex);
+    if (current.namenode == null) {
+      try {
+        current.namenode = DFSUtil.createRPCNamenode(current.address, conf, ugi);
+      } catch (IOException e) {
+        LOG.error("Failed to create RPC proxy to NameNode", e);
+        throw new RuntimeException(e);
+      }
+    }
+    return current.namenode;
+  }
+
+  @Override
+  public synchronized void performFailover(Object currentProxy) {
+    currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
+  }
+
+  @Override
+  public synchronized Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public synchronized void setConf(Configuration conf) {
+    this.conf = conf;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+      
+      Collection<String> addresses = conf.getTrimmedStringCollection(
+          CONFIGURED_NAMENODE_ADDRESSES);
+      if (addresses == null || addresses.size() == 0) {
+        throw new RuntimeException(this.getClass().getSimpleName() +
+            " is configured but " + CONFIGURED_NAMENODE_ADDRESSES +
+            " is not set.");
+      }
+      for (String address : addresses) {
+        proxies.add(new AddressRpcProxyPair(
+            NameNode.getAddress(new URI(address).getAuthority())));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Malformed URI set in " +
+          CONFIGURED_NAMENODE_ADDRESSES, e);
+    }
+  }
+
+  /**
+   * A little pair object to store the address and connected RPC proxy object to
+   * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
+   */
+  private static class AddressRpcProxyPair {
+    public InetSocketAddress address;
+    public ClientProtocol namenode;
+    
+    public AddressRpcProxyPair(InetSocketAddress address) {
+      this.address = address;
+    }
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * this proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (AddressRpcProxyPair proxy : proxies) {
+      if (proxy.namenode != null) {
+        RPC.stopProxy(proxy.namenode);
+      }
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java?rev=1179896&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
(added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
Thu Oct  6 23:26:14 2011
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDFSClientFailover {
+  
+  private static final Path TEST_FILE = new Path("/tmp/failover-test-file");
+  private static final int FILE_LENGTH_TO_VERIFY = 100;
+  
+  private Configuration conf = new Configuration();
+  private MiniDFSCluster cluster;
+  
+  @Before
+  public void setUpCluster() throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).build();
+    cluster.waitActive();
+  }
+  
+  @After
+  public void tearDownCluster() throws IOException {
+    cluster.shutdown();
+  }
+  
+  // TODO(HA): This test should probably be made to fail if a client fails over
+  // to talk to an NN with a different block pool id. Once failover between
+  // active/standy in a single block pool is implemented, this test should be
+  // changed to exercise that.
+  @Test
+  public void testDfsClientFailover() throws IOException, URISyntaxException {
+    final String nameServiceId = "name-service-uri";
+    InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
+    InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
+    
+    ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
+    ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
+    
+    DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null);
+    DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null);
+    
+    OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false);
+    OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false);
+    AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY);
+    AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
+    out1.close();
+    out2.close();
+    
+    String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
+    String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
+    conf.set(ConfiguredFailoverProxyProvider.CONFIGURED_NAMENODE_ADDRESSES,
+        address1 + "," + address2);
+        
+    conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameServiceId,
+        ConfiguredFailoverProxyProvider.class.getName());
+    
+    FileSystem fs = FileSystem.get(new URI("hdfs://" + nameServiceId), conf);
+    
+    AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
+    cluster.getNameNode(0).stop();
+    AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
+    
+    fs.close();
+  }
+
+}
\ No newline at end of file



Mime
View raw message