hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1214027 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/ src/main/java/org/apache/hadoop/h...
Date Wed, 14 Dec 2011 01:47:58 GMT
Author: szetszwo
Date: Wed Dec 14 01:47:57 2011
New Revision: 1214027

URL: http://svn.apache.org/viewvc?rev=1214027&view=rev
Log:
HDFS-2545. Change WebHDFS to support multiple namenodes in federation.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec 14 01:47:57 2011
@@ -175,6 +175,9 @@ Release 0.23.1 - UNRELEASED
     HDFS-2594. Support getDelegationTokens and createSymlink in WebHDFS.
     (szetszwo)
 
+    HDFS-2545. Change WebHDFS to support multiple namenodes in federation.
+    (szetszwo)
+
   IMPROVEMENTS
     HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Wed Dec 14 01:47:57 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
@@ -89,7 +90,8 @@ public class DatanodeWebHdfsMethods {
   private @Context ServletContext context;
   private @Context HttpServletResponse response;
 
-  private void init(final UserGroupInformation ugi, final DelegationParam delegation,
+  private void init(final UserGroupInformation ugi,
+      final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
       final UriFsPathParam path, final HttpOpParam<?> op,
       final Param<?, ?>... parameters) throws IOException {
     if (LOG.isTraceEnabled()) {
@@ -102,9 +104,8 @@ public class DatanodeWebHdfsMethods {
     
     if (UserGroupInformation.isSecurityEnabled()) {
       //add a token for RPC.
-      final DataNode datanode = (DataNode)context.getAttribute("datanode");
-      final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
-      final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
+      final Token<DelegationTokenIdentifier> token = 
+          new Token<DelegationTokenIdentifier>();
       token.decodeFromUrlString(delegation.getValue());
       SecurityUtil.setTokenService(token, nnRpcAddr);
       token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
@@ -122,6 +123,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME) 
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT) 
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
       @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@@ -135,8 +139,8 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
-    return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
-        replication, blockSize);
+    return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
+        overwrite, bufferSize, replication, blockSize);
   }
 
   /** Handle HTTP PUT request. */
@@ -149,6 +153,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME)
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
           final PutOpParam op,
@@ -164,8 +171,9 @@ public class DatanodeWebHdfsMethods {
           final BlockSizeParam blockSize
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, path, op, permission, overwrite, bufferSize,
-        replication, blockSize);
+    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+    init(ugi, delegation, nnRpcAddr, path, op, permission,
+        overwrite, bufferSize, replication, blockSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -178,7 +186,6 @@ public class DatanodeWebHdfsMethods {
     case CREATE:
     {
       final Configuration conf = new Configuration(datanode.getConf());
-      final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
       conf.set(FsPermission.UMASK_LABEL, "000");
 
       final int b = bufferSize.getValue(conf);
@@ -221,12 +228,15 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME)
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return post(in, ugi, delegation, ROOT, op, bufferSize);
+    return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
   }
 
   /** Handle HTTP POST request. */
@@ -239,6 +249,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME)
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
           final PostOpParam op,
@@ -246,7 +259,8 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, path, op, bufferSize);
+    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+    init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -259,7 +273,6 @@ public class DatanodeWebHdfsMethods {
     case APPEND:
     {
       final Configuration conf = new Configuration(datanode.getConf());
-      final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
       final int b = bufferSize.getValue(conf);
       DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
       FSDataOutputStream out = null;
@@ -291,6 +304,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME)
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
       @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@@ -300,7 +316,8 @@ public class DatanodeWebHdfsMethods {
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
-    return get(ugi, delegation, ROOT, op, offset, length, bufferSize); 
+    return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
+        bufferSize);
   }
 
   /** Handle HTTP GET request. */
@@ -311,6 +328,9 @@ public class DatanodeWebHdfsMethods {
       @Context final UserGroupInformation ugi,
       @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
           final DelegationParam delegation,
+      @QueryParam(NamenodeRpcAddressParam.NAME)
+      @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+          final NamenodeRpcAddressParam namenodeRpcAddress,
       @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
       @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
           final GetOpParam op,
@@ -322,7 +342,8 @@ public class DatanodeWebHdfsMethods {
           final BufferSizeParam bufferSize
       ) throws IOException, InterruptedException {
 
-    init(ugi, delegation, path, op, offset, length, bufferSize);
+    final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+    init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
 
     return ugi.doAs(new PrivilegedExceptionAction<Response>() {
       @Override
@@ -331,7 +352,6 @@ public class DatanodeWebHdfsMethods {
     final String fullpath = path.getAbsolutePath();
     final DataNode datanode = (DataNode)context.getAttribute("datanode");
     final Configuration conf = new Configuration(datanode.getConf());
-    final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
 
     switch(op.getValue()) {
     case OPEN:

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
Wed Dec 14 01:47:57 2011
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@@ -198,6 +199,7 @@ public class NamenodeWebHdfsMethods {
       delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
     }
     final String query = op.toQueryString() + delegationQuery
+        + "&" + new NamenodeRpcAddressParam(namenode)
         + Param.toSortedString("&", parameters);
     final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
 

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java?rev=1214027&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.InetSocketAddress;
+
+/** InetSocketAddressParam parameter. */
+abstract class InetSocketAddressParam
+    extends Param<InetSocketAddress, InetSocketAddressParam.Domain> {
+  InetSocketAddressParam(final Domain domain, final InetSocketAddress value) {
+    super(domain, value);
+  }
+
+  @Override
+  public String toString() {
+    return getName() + "=" + Domain.toString(getValue());
+  }
+
+  /** The domain of the parameter. */
+  static final class Domain extends Param.Domain<InetSocketAddress> {
+    Domain(final String paramName) {
+      super(paramName);
+    }
+
+    @Override
+    public String getDomain() {
+      return "<HOST:PORT>";
+    }
+
+    @Override
+    InetSocketAddress parse(final String str) {
+      final int i = str.indexOf(':');
+      if (i < 0) {
+        throw new IllegalArgumentException("Failed to parse \"" + str
+            + "\" as " + getDomain() + ": the ':' character not found.");
+      } else if (i == 0) {
+        throw new IllegalArgumentException("Failed to parse \"" + str
+            + "\" as " + getDomain() + ": HOST is empty.");
+      } else if (i == str.length() - 1) {
+        throw new IllegalArgumentException("Failed to parse \"" + str
+            + "\" as " + getDomain() + ": PORT is empty.");
+      }
+
+      final String host = str.substring(0, i);
+      final int port;
+      try {
+        port = Integer.parseInt(str.substring(i + 1));
+      } catch(NumberFormatException e) {
+        throw new IllegalArgumentException("Failed to parse \"" + str
+            + "\" as " + getDomain() + ": the ':' position is " + i
+            + " but failed to parse PORT.", e);
+      }
+
+      try {
+        return new InetSocketAddress(host, port);
+      } catch(Exception e) {
+        throw new IllegalArgumentException("Failed to parse \"" + str
+            + "\": cannot create InetSocketAddress(host=" + host
+            + ", port=" + port + ")", e);
+      }
+    }
+
+    /** Convert an InetSocketAddress to a HOST:PORT String. */
+    static String toString(final InetSocketAddress addr) {
+      return addr.getHostName() + ":" + addr.getPort();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java?rev=1214027&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/** Namenode RPC address parameter. */
+public class NamenodeRpcAddressParam extends InetSocketAddressParam {
+  /** Parameter name. */
+  public static final String NAME = "namenoderpcaddress";
+  /** Default parameter value. */
+  public static final String DEFAULT = "";
+
+  private static final Domain DOMAIN = new Domain(NAME);
+
+  /**
+   * Constructor.
+   * @param str a string representation of the parameter value.
+   */
+  public NamenodeRpcAddressParam(final String str) {
+    super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
+  }
+
+  /**
+   * Construct an object using the RPC address of the given namenode.
+   */
+  public NamenodeRpcAddressParam(final NameNode namenode) {
+    super(DOMAIN, namenode.getNameNodeAddress());
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java?rev=1214027&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test WebHDFS with multiple NameNodes
+ */
+public class TestWebHdfsWithMultipleNameNodes {
+  static final Log LOG = WebHdfsTestUtil.LOG;
+
+  static private void setLogLevel() {
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
+
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+  }
+
+  private static final Configuration conf = new HdfsConfiguration();
+  private static MiniDFSCluster cluster;
+  private static WebHdfsFileSystem[] webhdfs;
+
+  @BeforeClass
+  public static void setupTest() {
+    setLogLevel();
+    try {
+      setupCluster(4, 3);
+    } catch(Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void setupCluster(final int nNameNodes, final int nDataNodes)
+      throws Exception {
+    LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
+
+    conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numNameNodes(nNameNodes)
+        .numDataNodes(nDataNodes)
+        .build();
+    cluster.waitActive();
+    
+    webhdfs = new WebHdfsFileSystem[nNameNodes];
+    for(int i = 0; i < webhdfs.length; i++) {
+      final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
+      final String uri = WebHdfsFileSystem.SCHEME  + "://"
+          + addr.getHostName() + ":" + addr.getPort() + "/";
+      webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
+    }
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private static String createString(String prefix, int i) {
+    //The suffix is to make sure the strings have different lengths.
+    final String suffix = "*********************".substring(0, i+1);
+    return prefix + i + suffix + "\n";
+  }
+
+  private static String[] createStrings(String prefix, String name) {
+    final String[] strings = new String[webhdfs.length]; 
+    for(int i = 0; i < webhdfs.length; i++) {
+      strings[i] = createString(prefix, i);
+      LOG.info(name + "[" + i + "] = " + strings[i]);
+    }
+    return strings;
+  }
+
+  @Test
+  public void testRedirect() throws Exception {
+    final String dir = "/testRedirect/";
+    final String filename = "file";
+    final Path p = new Path(dir, filename);
+
+    final String[] writeStrings = createStrings("write to webhdfs ", "write"); 
+    final String[] appendStrings = createStrings("append to webhdfs ", "append"); 
+    
+    //test create: create a file for each namenode
+    for(int i = 0; i < webhdfs.length; i++) {
+      final FSDataOutputStream out = webhdfs[i].create(p);
+      out.write(writeStrings[i].getBytes());
+      out.close();
+    }
+    
+    for(int i = 0; i < webhdfs.length; i++) {
+      //check file length
+      final long expected = writeStrings[i].length();
+      Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
+    }
+
+    //test read: check file content for each namenode
+    for(int i = 0; i < webhdfs.length; i++) {
+      final FSDataInputStream in = webhdfs[i].open(p);
+      for(int c, j = 0; (c = in.read()) != -1; j++) {
+        Assert.assertEquals(writeStrings[i].charAt(j), c);
+      }
+      in.close();
+    }
+
+    //test append: append to the file for each namenode
+    for(int i = 0; i < webhdfs.length; i++) {
+      final FSDataOutputStream out = webhdfs[i].append(p);
+      out.write(appendStrings[i].getBytes());
+      out.close();
+    }
+
+    for(int i = 0; i < webhdfs.length; i++) {
+      //check file length
+      final long expected = writeStrings[i].length() + appendStrings[i].length();
+      Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
+    }
+
+    //test read: check file content for each namenode
+    for(int i = 0; i < webhdfs.length; i++) {
+      final StringBuilder b = new StringBuilder(); 
+      final FSDataInputStream in = webhdfs[i].open(p);
+      for(int c; (c = in.read()) != -1; ) {
+        b.append((char)c);
+      }
+      final int wlen = writeStrings[i].length();
+      Assert.assertEquals(writeStrings[i], b.substring(0, wlen));
+      Assert.assertEquals(appendStrings[i], b.substring(wlen));
+      in.close();
+    }
+  }
+}



Mime
View raw message