hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject svn commit: r1579303 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdf...
Date Wed, 19 Mar 2014 17:33:51 GMT
Author: jing9
Date: Wed Mar 19 17:33:51 2014
New Revision: 1579303

URL: http://svn.apache.org/r1579303
Log:
HDFS-6100. Merge r1579301 from trunk.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java
      - copied unchanged from r1579301, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java
Removed:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Mar 19
17:33:51 2014
@@ -405,6 +405,9 @@ Release 2.4.0 - UNRELEASED
 
     HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth)
 
+    HDFS-6100. DataNodeWebHdfsMethods does not failover in HA mode. (Haohui Mai
+    via jing9)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Wed Mar 19 17:33:51 2014
@@ -19,13 +19,13 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
 
 import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
@@ -40,6 +40,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,12 +49,14 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.hdfs.web.ParamFilter;
+import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
 import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
@@ -61,7 +64,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -86,18 +90,19 @@ public class DatanodeWebHdfsMethods {
   private static final UriFsPathParam ROOT = new UriFsPathParam("");
 
   private @Context ServletContext context;
+  private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
 
   private void init(final UserGroupInformation ugi,
-      final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
+      final DelegationParam delegation, final String nnId,
       final UriFsPathParam path, final HttpOpParam<?> op,
       final Param<?, ?>... parameters) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
           + ", ugi=" + ugi + Param.toSortedString(", ", parameters));
     }
-    if (nnRpcAddr == null) {
-      throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME
+    if (nnId == null) {
+      throw new IllegalArgumentException(NamenodeAddressParam.NAME
           + " is not specified.");
     }
 
@@ -106,15 +111,32 @@ public class DatanodeWebHdfsMethods {
     
     if (UserGroupInformation.isSecurityEnabled()) {
       //add a token for RPC.
-      final Token<DelegationTokenIdentifier> token = 
-          new Token<DelegationTokenIdentifier>();
-      token.decodeFromUrlString(delegation.getValue());
-      SecurityUtil.setTokenService(token, nnRpcAddr);
-      token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+      final Token<DelegationTokenIdentifier> token = deserializeToken
+              (delegation.getValue(), nnId);
       ugi.addToken(token);
     }
   }
 
+  @VisibleForTesting
+  Token<DelegationTokenIdentifier> deserializeToken
+          (String delegation,String nnId) throws IOException {
+    final DataNode datanode = (DataNode) context.getAttribute("datanode");
+    final Configuration conf = datanode.getConf();
+    final Token<DelegationTokenIdentifier> token = new
+            Token<DelegationTokenIdentifier>();
+    token.decodeFromUrlString(delegation);
+    URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME +
+            "://" + nnId);
+    boolean isHA = HAUtil.isLogicalUri(conf, nnUri);
+    if (isHA) {
+      token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri));
+    } else {
+      token.setService(SecurityUtil.buildTokenService(nnUri));
+    }
+    token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    return token;
+  }
+
   /** Handle HTTP PUT request for the root. */
   @PUT
   @Path("/")
@@ -125,9 +147,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME) 
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT) 
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
       @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@@ -141,8 +163,8 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
-    return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
-        overwrite, bufferSize, replication, blockSize);
+    return put(in, ugi, delegation, namenode, ROOT, op, permission,
+            overwrite, bufferSize, replication, blockSize);
   }
 
   /** Handle HTTP PUT request. */
@@ -155,9 +177,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME)
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
@@ -173,24 +195,22 @@ public class DatanodeWebHdfsMethods {
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
 
-    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
-    init(ugi, delegation, nnRpcAddr, path, op, permission,
+    final String nnId = namenode.getValue();
+    init(ugi, delegation, nnId, path, op, permission,
         overwrite, bufferSize, replication, blockSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException, URISyntaxException {
-        return put(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op,
-            permission, overwrite, bufferSize, replication, blockSize);
+        return put(in, nnId, path.getAbsolutePath(), op,
+                permission, overwrite, bufferSize, replication, blockSize);
       }
     });
   }
 
   private Response put(
       final InputStream in,
-      final UserGroupInformation ugi,
-      final DelegationParam delegation,
-      final InetSocketAddress nnRpcAddr,
+      final String nnId,
       final String fullpath,
       final PutOpParam op,
       final PermissionParam permission,
@@ -208,7 +228,7 @@ public class DatanodeWebHdfsMethods {
       conf.set(FsPermission.UMASK_LABEL, "000");
 
       final int b = bufferSize.getValue(conf);
-      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      DFSClient dfsclient = newDfsClient(nnId, conf);
       FSDataOutputStream out = null;
       try {
         out = new FSDataOutputStream(dfsclient.create(
@@ -225,9 +245,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, out);
         IOUtils.cleanup(LOG, dfsclient);
       }
-      final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf);
-      final URI uri = new URI(WebHdfsFileSystem.SCHEME, null,
-          nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null);
+      final String scheme = "http".equals(request.getScheme()) ?
+      WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME;
+      final URI uri = URI.create(String.format("%s://%s/%s", scheme,
+              nnId, fullpath));
       return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
@@ -245,15 +266,15 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME)
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
+    return post(in, ugi, delegation, namenode, ROOT, op, bufferSize);
   }
 
   /** Handle HTTP POST request. */
@@ -266,9 +287,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME)
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
@@ -276,23 +297,21 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
-    init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
+    final String nnId = namenode.getValue();
+    init(ugi, delegation, nnId, path, op, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
-        return post(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op,
-            bufferSize);
+        return post(in, nnId, path.getAbsolutePath(), op,
+                bufferSize);
       }
     });
   }
 
   private Response post(
       final InputStream in,
-      final UserGroupInformation ugi,
-      final DelegationParam delegation,
-      final InetSocketAddress nnRpcAddr,
+      final String nnId,
       final String fullpath,
       final PostOpParam op,
       final BufferSizeParam bufferSize
@@ -304,7 +323,7 @@ public class DatanodeWebHdfsMethods {
     {
       final Configuration conf = new Configuration(datanode.getConf());
       final int b = bufferSize.getValue(conf);
-      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      DFSClient dfsclient = newDfsClient(nnId, conf);
       FSDataOutputStream out = null;
       try {
         out = dfsclient.append(fullpath, b, null, null);
@@ -332,9 +351,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME)
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
       @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@@ -344,7 +363,7 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
+    return get(ugi, delegation, namenode, ROOT, op, offset, length,
         bufferSize);
   }
 
@@ -356,9 +375,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
-      @QueryParam(NamenodeRpcAddressParam.NAME)
-      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
-          final NamenodeRpcAddressParam namenodeRpcAddress,
+      @QueryParam(NamenodeAddressParam.NAME)
+      @DefaultValue(NamenodeAddressParam.DEFAULT)
+          final NamenodeAddressParam namenode,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
@@ -370,22 +389,20 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
-    init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
+    final String nnId = namenode.getValue();
+    init(ugi, delegation, nnId, path, op, offset, length, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
       public Response run() throws IOException {
-        return get(ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op,
-            offset, length, bufferSize);
+        return get(nnId, path.getAbsolutePath(), op, offset,
+                length, bufferSize);
       }
     });
   }
 
   private Response get(
-      final UserGroupInformation ugi,
-      final DelegationParam delegation,
-      final InetSocketAddress nnRpcAddr,
+      final String nnId,
       final String fullpath,
       final GetOpParam op,
       final OffsetParam offset,
@@ -399,7 +416,7 @@ public class DatanodeWebHdfsMethods {
     case OPEN:
     {
       final int b = bufferSize.getValue(conf);
-      final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      final DFSClient dfsclient = newDfsClient(nnId, conf);
       HdfsDataInputStream in = null;
       try {
         in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
@@ -426,7 +443,7 @@ public class DatanodeWebHdfsMethods {
     case GETFILECHECKSUM:
     {
       MD5MD5CRC32FileChecksum checksum = null;
-      DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
+      DFSClient dfsclient = newDfsClient(nnId, conf);
       try {
         checksum = dfsclient.getFileChecksum(fullpath);
         dfsclient.close();
@@ -441,4 +458,10 @@ public class DatanodeWebHdfsMethods {
       throw new UnsupportedOperationException(op + " is not supported");
     }
   }
+
+  private static DFSClient newDfsClient(String nnId,
+                                        Configuration conf) throws IOException {
+    URI uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId);
+    return new DFSClient(uri, conf);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Wed Mar 19 17:33:51 2014
@@ -126,7 +126,7 @@ public class NameNode implements NameNod
   static{
     HdfsConfiguration.init();
   }
-  
+
   /**
    * Categories of operations supported by the namenode.
    */
@@ -269,6 +269,11 @@ public class NameNode implements NameNod
 
   private JvmPauseMonitor pauseMonitor;
   private ObjectName nameNodeStatusBeanName;
+  /**
+   * The service name of the delegation token issued by the namenode. It is
+   * the name service id in HA mode, or the rpc address in non-HA mode.
+   */
+  private String tokenServiceName;
   
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
@@ -306,6 +311,13 @@ public class NameNode implements NameNod
     return startupProgress;
   }
 
+  /**
+   * Return the service name of the issued delegation token.
+   *
+   * @return The name service id in HA-mode, or the rpc address in non-HA mode
+   */
+  public String getTokenServiceName() { return tokenServiceName; }
+
   public static InetSocketAddress getAddress(String address) {
     return NetUtils.createSocketAddr(address, DEFAULT_PORT);
   }
@@ -499,6 +511,9 @@ public class NameNode implements NameNod
     loadNamesystem(conf);
 
     rpcServer = createRpcServer(conf);
+    final String nsId = getNameServiceId(conf);
+    tokenServiceName = HAUtil.isHAEnabled(conf, nsId) ? nsId : NetUtils
+            .getHostPortString(rpcServer.getRpcAddress());
     if (NamenodeRole.NAMENODE == role) {
       httpServer.setNameNodeAddress(getNameNodeAddress());
       httpServer.setFSImage(getFSImage());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
Wed Mar 19 17:33:51 2014
@@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
-import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@@ -275,7 +275,7 @@ public class NamenodeWebHdfsMethods {
       delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
     }
     final String query = op.toQueryString() + delegationQuery
-        + "&" + new NamenodeRpcAddressParam(namenode)
+        + "&" + new NamenodeAddressParam(namenode)
         + Param.toSortedString("&", parameters);
     final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
Wed Mar 19 17:33:51 2014
@@ -53,8 +53,11 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha
+        .ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.web.TestWebHDFSForHA;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
@@ -136,7 +139,22 @@ public class DFSTestUtil {
 
     NameNode.format(conf);
   }
-  
+
+  /**
+   * Create a new HA-enabled configuration.
+   */
+  public static Configuration newHAConfiguration(final String logicalName) {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
+    conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
+            logicalName), "nn1,nn2");
+    conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" +
+            "." + logicalName,
+            ConfiguredFailoverProxyProvider.class.getName());
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    return conf;
+  }
+
   /** class MyFile contains enough information to recreate the contents of
    * a single file.
    */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java
Wed Mar 19 17:33:51 2014
@@ -18,23 +18,25 @@
 
 package org.apache.hadoop.hdfs.web;
 
-import java.io.IOException;
-import java.net.URI;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Test;
 
-/** Test whether WebHDFS can connect to an HA cluster */
+import java.io.IOException;
+import java.net.URI;
+
 public class TestWebHDFSForHA {
   private static final String LOGICAL_NAME = "minidfs";
+  private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME +
+          "://" + LOGICAL_NAME);
   private static final MiniDFSNNTopology topo = new MiniDFSNNTopology()
       .addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
           new MiniDFSNNTopology.NNConf("nn1")).addNN(
@@ -42,8 +44,7 @@ public class TestWebHDFSForHA {
 
   @Test
   public void testHA() throws IOException {
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
     MiniDFSCluster cluster = null;
     FileSystem fs = null;
     try {
@@ -54,8 +55,7 @@ public class TestWebHDFSForHA {
 
       cluster.waitActive();
 
-      final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
-      fs = FileSystem.get(URI.create(uri), conf);
+      fs = FileSystem.get(WEBHDFS_URI, conf);
       cluster.transitionToActive(0);
 
       final Path dir = new Path("/test");
@@ -67,9 +67,7 @@ public class TestWebHDFSForHA {
       final Path dir2 = new Path("/test2");
       Assert.assertTrue(fs.mkdirs(dir2));
     } finally {
-      if (fs != null) {
-        fs.close();
-      }
+      IOUtils.cleanup(null, fs);
       if (cluster != null) {
         cluster.shutdown();
       }
@@ -78,10 +76,9 @@ public class TestWebHDFSForHA {
 
   @Test
   public void testSecureHA() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
         true);
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
 
     MiniDFSCluster cluster = null;
     WebHdfsFileSystem fs = null;
@@ -92,8 +89,7 @@ public class TestWebHDFSForHA {
       HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
       cluster.waitActive();
 
-      final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME;
-      fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf);
+      fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf);
 
       cluster.transitionToActive(0);
       Token<?> token = fs.getDelegationToken(null);
@@ -104,9 +100,44 @@ public class TestWebHDFSForHA {
       fs.renewDelegationToken(token);
       fs.cancelDelegationToken(token);
     } finally {
-      if (fs != null) {
-        fs.close();
+      IOUtils.cleanup(null, fs);
+      if (cluster != null) {
+        cluster.shutdown();
       }
+    }
+  }
+
+  @Test
+  public void testFailoverAfterOpen() throws IOException {
+    Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    final Path p = new Path("/test");
+    final byte[] data = "Hello".getBytes();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
+              .numDataNodes(1).build();
+
+      HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
+
+      cluster.waitActive();
+
+      fs = FileSystem.get(WEBHDFS_URI, conf);
+      cluster.transitionToActive(1);
+
+      FSDataOutputStream out = fs.create(p);
+      cluster.shutdownNameNode(1);
+      cluster.transitionToActive(0);
+
+      out.write(data);
+      out.close();
+      FSDataInputStream in = fs.open(p);
+      byte[] buf = new byte[data.length];
+      IOUtils.readFully(in, buf, 0, buf.length);
+      Assert.assertArrayEquals(data, buf);
+    } finally {
+      IOUtils.cleanup(null, fs);
       if (cluster != null) {
         cluster.shutdown();
       }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1579303&r1=1579302&r2=1579303&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
Wed Mar 19 17:33:51 2014
@@ -43,13 +43,8 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -465,7 +460,7 @@ public class TestWebHdfsFileSystemContra
       AppendTestUtil.testAppend(fs, new Path(dir, "append"));
     }
 
-    {//test NamenodeRpcAddressParam not set.
+    {//test NamenodeAddressParam not set.
       final HttpOpParam.Op op = PutOpParam.Op.CREATE;
       final URL url = webhdfs.toUrl(op, dir);
       HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -476,9 +471,9 @@ public class TestWebHdfsFileSystemContra
       final String redirect = conn.getHeaderField("Location");
       conn.disconnect();
 
-      //remove NamenodeRpcAddressParam
+      //remove NamenodeAddressParam
       WebHdfsFileSystem.LOG.info("redirect = " + redirect);
-      final int i = redirect.indexOf(NamenodeRpcAddressParam.NAME);
+      final int i = redirect.indexOf(NamenodeAddressParam.NAME);
       final int j = redirect.indexOf("&", i);
       String modified = redirect.substring(0, i - 1) + redirect.substring(j);
       WebHdfsFileSystem.LOG.info("modified = " + modified);



Mime
View raw message